package org.apache.flink.streaming.runtime.operators.sink;

import java.util.Collections;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.class */
class CommitterOperator<InputT, OutputT> extends AbstractStreamOperator<byte[]> implements OneInputStreamOperator<byte[], byte[]>, BoundedOneInput {
    private final SimpleVersionedSerializer<InputT> inputSerializer;
    private final CommitterHandler<InputT, OutputT> committerHandler;
    private final CommitRetrier commitRetrier;

    public CommitterOperator(ProcessingTimeService processingTimeService, SimpleVersionedSerializer<InputT> simpleVersionedSerializer, CommitterHandler<InputT, OutputT> committerHandler) {
        this.inputSerializer = (SimpleVersionedSerializer) Preconditions.checkNotNull(simpleVersionedSerializer);
        this.committerHandler = (CommitterHandler) Preconditions.checkNotNull(committerHandler);
        this.processingTimeService = processingTimeService;
        this.commitRetrier = new CommitRetrier(processingTimeService, committerHandler);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.committerHandler.initializeState(stateInitializationContext);
        this.commitRetrier.retryWithDelay();
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        this.committerHandler.snapshotState(stateSnapshotContext);
    }

    @Override // org.apache.flink.streaming.api.operators.BoundedOneInput
    public void endInput() throws Exception {
        this.committerHandler.endOfInput();
        this.commitRetrier.retryIndefinitely();
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator
    public void notifyCheckpointComplete(long j) throws Exception {
        super.notifyCheckpointComplete(j);
        this.committerHandler.notifyCheckpointCompleted(j);
        this.commitRetrier.retryWithDelay();
    }

    @Override // org.apache.flink.streaming.api.operators.Input
    public void processElement(StreamRecord<byte[]> streamRecord) throws Exception {
        this.committerHandler.processCommittables(Collections.singletonList(SimpleVersionedSerialization.readVersionAndDeSerialize(this.inputSerializer, streamRecord.getValue())));
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        IOUtils.closeAll(new AutoCloseable[]{this.committerHandler, () -> {
            super.close();
        }});
    }
}
