package storm.trident.spout;

import backtype.storm.Config;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
import org.apache.log4j.Logger;
import storm.trident.spout.ITridentSpout;
import storm.trident.topology.MasterBatchCoordinator;
import storm.trident.topology.TransactionAttempt;
import storm.trident.topology.state.RotatingTransactionalState;
import storm.trident.topology.state.TransactionalState;

/* loaded from: input_file:storm/trident/spout/TridentSpoutCoordinator.class */
public class TridentSpoutCoordinator implements IBasicBolt {
    public static final Logger LOG = Logger.getLogger(TridentSpoutCoordinator.class);
    private static final String META_DIR = "meta";
    ITridentSpout _spout;
    ITridentSpout.BatchCoordinator _coord;
    RotatingTransactionalState _state;
    TransactionalState _underlyingState;
    String _id;

    public TridentSpoutCoordinator(String str, ITridentSpout iTridentSpout) {
        this._spout = iTridentSpout;
        this._id = str;
    }

    @Override // backtype.storm.topology.IBasicBolt
    public void prepare(Map map, TopologyContext topologyContext) {
        this._coord = this._spout.getCoordinator(this._id, map, topologyContext);
        this._underlyingState = TransactionalState.newCoordinatorState(map, this._id);
        this._state = new RotatingTransactionalState(this._underlyingState, META_DIR);
    }

    @Override // backtype.storm.topology.IBasicBolt
    public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
        TransactionAttempt transactionAttempt = (TransactionAttempt) tuple.getValue(0);
        if (tuple.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
            this._state.cleanupBefore(transactionAttempt.getTransactionId().longValue());
            this._coord.success(transactionAttempt.getTransactionId().longValue());
        } else {
            if (tuple.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) {
                return;
            }
            long longValue = transactionAttempt.getTransactionId().longValue();
            Object initializeTransaction = this._coord.initializeTransaction(longValue, this._state.getPreviousState(longValue), this._state.getState(longValue));
            this._state.overrideState(longValue, initializeTransaction);
            basicOutputCollector.emit(MasterBatchCoordinator.BATCH_STREAM_ID, new Values(transactionAttempt, initializeTransaction));
        }
    }

    @Override // backtype.storm.topology.IBasicBolt
    public void cleanup() {
        this._coord.close();
        this._underlyingState.close();
    }

    @Override // backtype.storm.topology.IComponent
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream(MasterBatchCoordinator.BATCH_STREAM_ID, new Fields("tx", "metadata"));
    }

    @Override // backtype.storm.topology.IComponent
    public Map<String, Object> getComponentConfiguration() {
        Config config = new Config();
        config.setMaxTaskParallelism(1);
        return config;
    }
}
