/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.exec.repl.incremental;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.ReplLastIdInfo;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ddl.DDLWork;
import org.apache.hadoop.hive.ql.ddl.database.alter.poperties.AlterDatabaseSetPropertiesDesc;
import org.apache.hadoop.hive.ql.ddl.misc.flags.ReplRemoveFirstIncLoadPendFlagDesc;
import org.apache.hadoop.hive.ql.ddl.table.misc.properties.AlterTableSetPropertiesDesc;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadEventsIterator;
import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker;
import org.apache.hadoop.hive.ql.parse.repl.load.log.IncrementalLoadLogger;
import org.apache.hadoop.hive.ql.parse.repl.load.message.MessageHandler;
import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
import org.apache.hadoop.hive.ql.plan.ReplTxnWork;
import org.slf4j.Logger;

public class IncrementalLoadTasksBuilder {
    private final String dbName;
    private final IncrementalLoadEventsIterator iterator;
    private final HashSet<ReadEntity> inputs;
    private final HashSet<WriteEntity> outputs;
    private Logger log;
    private final HiveConf conf;
    private final ReplLogger replLogger;
    private static long numIteration;
    private final Long eventTo;

    public IncrementalLoadTasksBuilder(String dbName, String loadPath, IncrementalLoadEventsIterator iterator, HiveConf conf, Long eventTo) {
        this.dbName = dbName;
        this.iterator = iterator;
        this.inputs = new HashSet();
        this.outputs = new HashSet();
        this.log = null;
        this.conf = conf;
        this.replLogger = new IncrementalLoadLogger(dbName, loadPath, iterator.getNumEvents());
        this.replLogger.startLog();
        this.eventTo = eventTo;
        numIteration = 0L;
    }

    public Task<?> build(Context context, Hive hive, Logger log, TaskTracker tracker) throws Exception {
        Task<DependencyCollectionWork> evTaskRoot = TaskFactory.get(new DependencyCollectionWork());
        Task<Serializable> taskChainTail = evTaskRoot;
        Long lastReplayedEvent = null;
        this.log = log;
        this.log.debug("Iteration num " + ++numIteration);
        while (this.iterator.hasNext() && tracker.canAddMoreTasks()) {
            String location;
            DumpMetaData eventDmd;
            FileStatus dir = this.iterator.next();
            if (!this.shouldReplayEvent(dir, (eventDmd = new DumpMetaData(new Path(location = dir.getPath().toUri().toString()), this.conf)).getDumpType(), this.dbName)) {
                this.log.debug("Skipping event {} from {} for DB {} maxTasks: {}", new Object[]{eventDmd.getDumpType(), dir.getPath().toUri(), this.dbName, tracker.numberOfTasks()});
                continue;
            }
            this.log.debug("Loading event {} from {} for DB {} maxTasks: {}", new Object[]{eventDmd.getDumpType(), dir.getPath().toUri(), this.dbName, tracker.numberOfTasks()});
            MessageHandler.Context mhContext = new MessageHandler.Context(this.dbName, location, taskChainTail, eventDmd, this.conf, hive, context, this.log);
            List<Task<? extends Serializable>> evTasks = this.analyzeEventLoad(mhContext);
            if (evTasks != null && !evTasks.isEmpty()) {
                ReplStateLogWork replStateLogWork = new ReplStateLogWork(this.replLogger, dir.getPath().getName(), eventDmd.getDumpType().toString());
                Task<ReplStateLogWork> barrierTask = TaskFactory.get(replStateLogWork, this.conf);
                AddDependencyToLeaves function = new AddDependencyToLeaves(barrierTask);
                DAGTraversal.traverse(evTasks, function);
                this.log.debug("Updated taskChainTail from {}:{} to {}:{}", new Object[]{taskChainTail.getClass(), taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId()});
                tracker.addTaskList(taskChainTail.getChildTasks());
                taskChainTail = barrierTask;
            }
            lastReplayedEvent = eventDmd.getEventTo();
        }
        if (!this.hasMoreWork()) {
            ReplRemoveFirstIncLoadPendFlagDesc desc = new ReplRemoveFirstIncLoadPendFlagDesc(this.dbName);
            Task<DDLWork> updateIncPendTask = TaskFactory.get(new DDLWork(this.inputs, this.outputs, desc), this.conf);
            taskChainTail.addDependentTask(updateIncPendTask);
            taskChainTail = updateIncPendTask;
            HashMap<String, String> dbProps = new HashMap<String, String>();
            dbProps.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), String.valueOf(lastReplayedEvent));
            ReplStateLogWork replStateLogWork = new ReplStateLogWork(this.replLogger, dbProps);
            Task<ReplStateLogWork> barrierTask = TaskFactory.get(replStateLogWork, this.conf);
            taskChainTail.addDependentTask(barrierTask);
            this.log.debug("Added {}:{} as a precursor of barrier task {}:{}", new Object[]{taskChainTail.getClass(), taskChainTail.getId(), barrierTask.getClass(), barrierTask.getId()});
        }
        return evTaskRoot;
    }

    public boolean hasMoreWork() {
        return this.iterator.hasNext();
    }

    private boolean isEventNotReplayed(Map<String, String> params, FileStatus dir, DumpType dumpType) {
        String replLastId;
        if (params != null && params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()) && Long.parseLong(replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString())) >= Long.parseLong(dir.getPath().getName())) {
            this.log.debug("Event " + (Object)((Object)dumpType) + " with replId " + Long.parseLong(dir.getPath().getName()) + " is already replayed. LastReplId - " + Long.parseLong(replLastId));
            return false;
        }
        return true;
    }

    private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType, String dbName) {
        if (StringUtils.isBlank(dbName)) {
            return true;
        }
        try {
            Database database = Hive.get().getDatabase(dbName);
            return database == null || this.isEventNotReplayed(database.getParameters(), dir, dumpType);
        }
        catch (HiveException e) {
            this.log.debug("Failed to get the database " + dbName);
            return true;
        }
    }

    private List<Task<? extends Serializable>> analyzeEventLoad(MessageHandler.Context context) throws SemanticException {
        MessageHandler messageHandler = context.dmd.getDumpType().handler();
        List<Task<? extends Serializable>> tasks = messageHandler.handle(context);
        if (context.precursor != null) {
            for (Task<? extends Serializable> t : tasks) {
                context.precursor.addDependentTask(t);
                this.log.debug("Added {}:{} as a precursor of {}:{}", new Object[]{context.precursor.getClass(), context.precursor.getId(), t.getClass(), t.getId()});
            }
        }
        this.inputs.addAll(messageHandler.readEntities());
        this.outputs.addAll(messageHandler.writeEntities());
        return this.addUpdateReplStateTasks(messageHandler.getUpdatedMetadata(), tasks);
    }

    private Task<? extends Serializable> getMigrationCommitTxnTask(String dbName, String tableName, List<Map<String, String>> partSpec, String replState, Task<? extends Serializable> preCursor) throws SemanticException {
        ReplLastIdInfo replLastIdInfo = new ReplLastIdInfo(dbName, Long.parseLong(replState));
        replLastIdInfo.setTable(tableName);
        if (partSpec != null && !partSpec.isEmpty()) {
            ArrayList<String> partitionList = new ArrayList<String>();
            for (Map<String, String> part : partSpec) {
                try {
                    partitionList.add(Warehouse.makePartName(part, false));
                }
                catch (MetaException e) {
                    throw new SemanticException(e.getMessage());
                }
            }
            replLastIdInfo.setPartitionList(partitionList);
        }
        Task<ReplTxnWork> updateReplIdTxnTask = TaskFactory.get(new ReplTxnWork(replLastIdInfo, ReplTxnWork.OperationType.REPL_MIGRATION_COMMIT_TXN), this.conf);
        if (preCursor != null) {
            preCursor.addDependentTask(updateReplIdTxnTask);
            this.log.debug("Added {}:{} as a precursor of {}:{}", new Object[]{preCursor.getClass(), preCursor.getId(), updateReplIdTxnTask.getClass(), updateReplIdTxnTask.getId()});
        }
        return updateReplIdTxnTask;
    }

    private Task<? extends Serializable> tableUpdateReplStateTask(String dbName, String tableName, Map<String, String> partSpec, String replState, Task<? extends Serializable> preCursor) throws SemanticException {
        HashMap<String, String> mapProp = new HashMap<String, String>();
        mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState);
        TableName tName = TableName.fromString(tableName, null, dbName);
        AlterTableSetPropertiesDesc alterTblDesc = new AlterTableSetPropertiesDesc(tName, partSpec, new ReplicationSpec(replState, replState), false, mapProp, false, false, null);
        Task<DDLWork> updateReplIdTask = TaskFactory.get(new DDLWork(this.inputs, this.outputs, alterTblDesc), this.conf);
        if (preCursor != null) {
            preCursor.addDependentTask(updateReplIdTask);
            this.log.debug("Added {}:{} as a precursor of {}:{}", new Object[]{preCursor.getClass(), preCursor.getId(), updateReplIdTask.getClass(), updateReplIdTask.getId()});
        }
        return updateReplIdTask;
    }

    private Task<? extends Serializable> dbUpdateReplStateTask(String dbName, String replState, Task<? extends Serializable> preCursor) {
        HashMap<String, String> mapProp = new HashMap<String, String>();
        mapProp.put(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replState);
        AlterDatabaseSetPropertiesDesc alterDbDesc = new AlterDatabaseSetPropertiesDesc(dbName, mapProp, new ReplicationSpec(replState, replState));
        Task<DDLWork> updateReplIdTask = TaskFactory.get(new DDLWork(this.inputs, this.outputs, alterDbDesc), this.conf);
        if (preCursor != null) {
            preCursor.addDependentTask(updateReplIdTask);
            this.log.debug("Added {}:{} as a precursor of {}:{}", new Object[]{preCursor.getClass(), preCursor.getId(), updateReplIdTask.getClass(), updateReplIdTask.getId()});
        }
        return updateReplIdTask;
    }

    private List<Task<? extends Serializable>> addUpdateReplStateTasks(UpdatedMetaDataTracker updatedMetaDataTracker, List<Task<? extends Serializable>> importTasks) throws SemanticException {
        if (importTasks.isEmpty()) {
            this.log.debug("No objects need update of repl state: 0 import tasks");
            return importTasks;
        }
        boolean needCommitTx = updatedMetaDataTracker.isNeedCommitTxn();
        if (needCommitTx) assert (updatedMetaDataTracker.getUpdateMetaDataList().size() <= 1);
        Task<DependencyCollectionWork> barrierTask = TaskFactory.get(new DependencyCollectionWork(), this.conf);
        ArrayList<Task<? extends Serializable>> tasks = new ArrayList<Task<? extends Serializable>>();
        for (UpdatedMetaDataTracker.UpdateMetaData updateMetaData : updatedMetaDataTracker.getUpdateMetaDataList()) {
            Task<? extends Serializable> updateReplIdTask;
            String replState = updateMetaData.getReplState();
            String dbName = updateMetaData.getDbName();
            String tableName = updateMetaData.getTableName();
            if (needCommitTx) {
                if (updateMetaData.getPartitionsList().size() > 0) {
                    updateReplIdTask = this.getMigrationCommitTxnTask(dbName, tableName, updateMetaData.getPartitionsList(), replState, barrierTask);
                    tasks.add(updateReplIdTask);
                    break;
                }
            } else {
                for (Map<String, String> partSpec : updateMetaData.getPartitionsList()) {
                    updateReplIdTask = this.tableUpdateReplStateTask(dbName, tableName, partSpec, replState, barrierTask);
                    tasks.add(updateReplIdTask);
                }
            }
            if (tableName != null) {
                if (needCommitTx) {
                    updateReplIdTask = this.getMigrationCommitTxnTask(dbName, tableName, null, replState, barrierTask);
                    tasks.add(updateReplIdTask);
                    break;
                }
                updateReplIdTask = this.tableUpdateReplStateTask(dbName, tableName, null, replState, barrierTask);
                tasks.add(updateReplIdTask);
            }
            if (needCommitTx) {
                updateReplIdTask = this.getMigrationCommitTxnTask(dbName, null, null, replState, barrierTask);
                tasks.add(updateReplIdTask);
                continue;
            }
            updateReplIdTask = this.dbUpdateReplStateTask(dbName, replState, barrierTask);
            tasks.add(updateReplIdTask);
        }
        if (tasks.isEmpty()) {
            this.log.debug("No objects need update of repl state: 0 update tracker tasks");
            return importTasks;
        }
        DAGTraversal.traverse(importTasks, new AddDependencyToLeaves(barrierTask));
        return tasks;
    }

    public Long eventTo() {
        return this.eventTo;
    }

    public static long getNumIteration() {
        return numIteration;
    }
}

