/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.sink;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.log4j.Appender;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TwoPhaseCommitSinkFunctionTest {
    @Rule
    public TemporaryFolder folder = new TemporaryFolder();
    private FileBasedSinkFunction sinkFunction;
    private OneInputStreamOperatorTestHarness<String, Object> harness;
    private AtomicBoolean throwException = new AtomicBoolean();
    private File targetDirectory;
    private File tmpDirectory;
    private SettableClock clock;
    private Logger logger;
    private AppenderSkeleton testAppender;
    private List<LoggingEvent> loggingEvents;

    @Before
    public void setUp() throws Exception {
        this.loggingEvents = new ArrayList<LoggingEvent>();
        this.setupLogger();
        this.targetDirectory = this.folder.newFolder("_target");
        this.tmpDirectory = this.folder.newFolder("_tmp");
        this.clock = new SettableClock();
        this.setUpTestHarness();
    }

    private void setupLogger() {
        Logger.getRootLogger().removeAllAppenders();
        this.logger = Logger.getLogger(TwoPhaseCommitSinkFunction.class);
        this.testAppender = new AppenderSkeleton(){

            protected void append(LoggingEvent event) {
                TwoPhaseCommitSinkFunctionTest.this.loggingEvents.add(event);
            }

            public void close() {
            }

            public boolean requiresLayout() {
                return false;
            }
        };
        this.logger.addAppender((Appender)this.testAppender);
        this.logger.setLevel(Level.WARN);
    }

    @After
    public void tearDown() throws Exception {
        this.closeTestHarness();
        if (this.logger != null) {
            this.logger.removeAppender((Appender)this.testAppender);
        }
        this.loggingEvents = null;
    }

    private void setUpTestHarness() throws Exception {
        this.sinkFunction = new FileBasedSinkFunction();
        this.harness = new OneInputStreamOperatorTestHarness(new StreamSink((SinkFunction)this.sinkFunction), StringSerializer.INSTANCE);
        this.harness.setup();
    }

    private void closeTestHarness() throws Exception {
        this.harness.close();
    }

    @Test
    public void testNotifyOfCompletedCheckpoint() throws Exception {
        this.harness.open();
        this.harness.processElement("42", 0L);
        this.harness.snapshot(0L, 1L);
        this.harness.processElement("43", 2L);
        this.harness.snapshot(1L, 3L);
        this.harness.processElement("44", 4L);
        this.harness.snapshot(2L, 5L);
        this.harness.notifyOfCompletedCheckpoint(1L);
        this.assertExactlyOnce(Arrays.asList("42", "43"));
        Assert.assertEquals((long)2L, (long)this.tmpDirectory.listFiles().length);
    }

    @Test
    public void testFailBeforeNotify() throws Exception {
        OperatorStateHandles snapshot;
        block2: {
            this.harness.open();
            this.harness.processElement("42", 0L);
            this.harness.snapshot(0L, 1L);
            this.harness.processElement("43", 2L);
            snapshot = this.harness.snapshot(1L, 3L);
            Assert.assertTrue((boolean)this.tmpDirectory.setWritable(false));
            try {
                this.harness.processElement("44", 4L);
                this.harness.snapshot(2L, 5L);
                Assert.fail((String)"something should fail");
            }
            catch (Exception ex) {
                if (ex.getCause() instanceof FileNotFoundException) break block2;
                throw ex;
            }
        }
        this.closeTestHarness();
        Assert.assertTrue((boolean)this.tmpDirectory.setWritable(true));
        this.setUpTestHarness();
        this.harness.initializeState(snapshot);
        this.assertExactlyOnce(Arrays.asList("42", "43"));
        this.closeTestHarness();
        Assert.assertEquals((long)0L, (long)this.tmpDirectory.listFiles().length);
    }

    @Test
    public void testIgnoreCommitExceptionDuringRecovery() throws Exception {
        this.clock.setEpochMilli(0L);
        this.harness.open();
        this.harness.processElement("42", 0L);
        OperatorStateHandles snapshot = this.harness.snapshot(0L, 1L);
        this.harness.notifyOfCompletedCheckpoint(1L);
        long transactionTimeout = 1000L;
        this.sinkFunction.setTransactionTimeout(1000L);
        this.sinkFunction.ignoreFailuresAfterTransactionTimeout();
        this.throwException.set(true);
        try {
            this.harness.initializeState(snapshot);
            Assert.fail((String)"Expected exception not thrown");
        }
        catch (RuntimeException e) {
            Assert.assertEquals((Object)"Expected exception", (Object)e.getMessage());
        }
        this.clock.setEpochMilli(1001L);
        this.harness.initializeState(snapshot);
        this.assertExactlyOnce(Collections.singletonList("42"));
    }

    @Test
    public void testLogTimeoutAlmostReachedWarningDuringCommit() throws Exception {
        this.clock.setEpochMilli(0L);
        long transactionTimeout = 1000L;
        double warningRatio = 0.5;
        this.sinkFunction.setTransactionTimeout(1000L);
        this.sinkFunction.enableTransactionTimeoutWarnings(0.5);
        this.harness.open();
        this.harness.snapshot(0L, 1L);
        long elapsedTime = 502L;
        this.clock.setEpochMilli(502L);
        this.harness.notifyOfCompletedCheckpoint(1L);
        List logMessages = this.loggingEvents.stream().map(LoggingEvent::getRenderedMessage).collect(Collectors.toList());
        Assert.assertThat(logMessages, (Matcher)CoreMatchers.hasItem((Matcher)CoreMatchers.containsString((String)"has been open for 502 ms. This is close to or even exceeding the transaction timeout of 1000 ms.")));
    }

    @Test
    public void testLogTimeoutAlmostReachedWarningDuringRecovery() throws Exception {
        this.clock.setEpochMilli(0L);
        long transactionTimeout = 1000L;
        double warningRatio = 0.5;
        this.sinkFunction.setTransactionTimeout(1000L);
        this.sinkFunction.enableTransactionTimeoutWarnings(0.5);
        this.harness.open();
        OperatorStateHandles snapshot = this.harness.snapshot(0L, 1L);
        long elapsedTime = 502L;
        this.clock.setEpochMilli(502L);
        this.harness.initializeState(snapshot);
        List logMessages = this.loggingEvents.stream().map(LoggingEvent::getRenderedMessage).collect(Collectors.toList());
        Assert.assertThat(logMessages, (Matcher)CoreMatchers.hasItem((Matcher)CoreMatchers.containsString((String)"has been open for 502 ms. This is close to or even exceeding the transaction timeout of 1000 ms.")));
    }

    private void assertExactlyOnce(List<String> expectedValues) throws IOException {
        ArrayList<String> actualValues = new ArrayList<String>();
        for (File file : this.targetDirectory.listFiles()) {
            actualValues.addAll(Files.readAllLines(file.toPath(), Charset.defaultCharset()));
        }
        Collections.sort(actualValues);
        Collections.sort(expectedValues);
        Assert.assertEquals(expectedValues, actualValues);
    }

    private static class SettableClock
    extends Clock {
        private final ZoneId zoneId;
        private long epochMilli;

        private SettableClock() {
            this.zoneId = ZoneOffset.UTC;
        }

        public SettableClock(ZoneId zoneId, long epochMilli) {
            this.zoneId = zoneId;
            this.epochMilli = epochMilli;
        }

        public void setEpochMilli(long epochMilli) {
            this.epochMilli = epochMilli;
        }

        @Override
        public ZoneId getZone() {
            return this.zoneId;
        }

        @Override
        public Clock withZone(ZoneId zone) {
            if (zone.equals(this.zoneId)) {
                return this;
            }
            return new SettableClock(zone, this.epochMilli);
        }

        @Override
        public Instant instant() {
            return Instant.ofEpochMilli(this.epochMilli);
        }
    }

    private static class FileTransaction {
        private final File tmpFile;
        private final transient BufferedWriter writer;

        public FileTransaction(File tmpFile) throws IOException {
            this.tmpFile = tmpFile;
            this.writer = new BufferedWriter(new FileWriter(tmpFile));
        }

        public String toString() {
            return String.format("FileTransaction[%s]", this.tmpFile.getName());
        }
    }

    private class FileBasedSinkFunction
    extends TwoPhaseCommitSinkFunction<String, FileTransaction, Void> {
        public FileBasedSinkFunction() {
            super((TypeSerializer)new KryoSerializer(FileTransaction.class, new ExecutionConfig()), (TypeSerializer)VoidSerializer.INSTANCE, (Clock)TwoPhaseCommitSinkFunctionTest.this.clock);
            if (!TwoPhaseCommitSinkFunctionTest.this.tmpDirectory.isDirectory() || !TwoPhaseCommitSinkFunctionTest.this.targetDirectory.isDirectory()) {
                throw new IllegalArgumentException();
            }
        }

        protected void invoke(FileTransaction transaction, String value, SinkFunction.Context context) throws Exception {
            transaction.writer.write(value);
        }

        protected FileTransaction beginTransaction() throws Exception {
            File tmpFile = new File(TwoPhaseCommitSinkFunctionTest.this.tmpDirectory, UUID.randomUUID().toString());
            return new FileTransaction(tmpFile);
        }

        protected void preCommit(FileTransaction transaction) throws Exception {
            transaction.writer.flush();
            transaction.writer.close();
        }

        protected void commit(FileTransaction transaction) {
            if (TwoPhaseCommitSinkFunctionTest.this.throwException.get()) {
                throw new RuntimeException("Expected exception");
            }
            try {
                Files.move(transaction.tmpFile.toPath(), new File(TwoPhaseCommitSinkFunctionTest.this.targetDirectory, transaction.tmpFile.getName()).toPath(), StandardCopyOption.ATOMIC_MOVE);
            }
            catch (IOException e) {
                throw new IllegalStateException(e);
            }
        }

        protected void abort(FileTransaction transaction) {
            try {
                transaction.writer.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
            transaction.tmpFile.delete();
        }

        protected void recoverAndAbort(FileTransaction transaction) {
            transaction.tmpFile.delete();
        }
    }
}

