/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.JobManagerMode;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.duration.FiniteDuration;

public class JobManagerProcessReapingTest
extends TestLogger {
    @Test
    public void testReapProcessOnFailure() {
        Process jmProcess = null;
        ActorSystem localSystem = null;
        StringWriter processOutput = new StringWriter();
        try {
            String javaCommand = CommonTestUtils.getJavaCommandPath();
            if (javaCommand == null) {
                System.out.println("---- Skipping JobManagerProcessReapingTest : Could not find java executable ----");
                return;
            }
            File tempLogFile = File.createTempFile("testlogconfig", "properties");
            tempLogFile.deleteOnExit();
            CommonTestUtils.printLog4jDebugConfig(tempLogFile);
            String[] command = new String[]{javaCommand, "-Dlog.level=DEBUG", "-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(), "-Xms256m", "-Xmx256m", "-classpath", CommonTestUtils.getCurrentClasspath(), JobManagerTestEntryPoint.class.getName()};
            ProcessBuilder bld = new ProcessBuilder(command);
            jmProcess = bld.start();
            new PipeForwarder(jmProcess.getErrorStream(), processOutput);
            Tuple2 localAddress = new Tuple2((Object)"localhost", (Object)0);
            localSystem = AkkaUtils.createActorSystem((Configuration)new Configuration(), (Option)new Some((Object)localAddress));
            ActorRef jobManagerRef = null;
            Throwable lastError = null;
            String pattern = "Starting JobManager at [^:]*://flink@[^:]*:(\\d*)/";
            Pattern r = Pattern.compile(pattern);
            int jobManagerPort = -1;
            for (int i = 0; i < 40; ++i) {
                Matcher m = r.matcher(processOutput.toString());
                if (m.find()) {
                    jobManagerPort = Integer.parseInt(m.group(1));
                    break;
                }
                Thread.sleep(500L);
            }
            if (jobManagerPort != -1) {
                try {
                    String jobManagerAkkaUrl = AkkaRpcServiceUtils.getRpcUrl((String)"localhost", (int)jobManagerPort, (String)"jobmanager", (HighAvailabilityServicesUtils.AddressResolution)HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION, (AkkaRpcServiceUtils.AkkaProtocol)AkkaRpcServiceUtils.AkkaProtocol.TCP);
                    jobManagerRef = AkkaUtils.getActorRef((String)jobManagerAkkaUrl, (ActorSystem)localSystem, (FiniteDuration)new FiniteDuration(25L, TimeUnit.SECONDS));
                }
                catch (Throwable t) {
                    lastError = t;
                }
            } else {
                Assert.fail((String)"Could not determine port of started JobManager.");
            }
            Assert.assertTrue((String)"JobManager process died", (boolean)CommonTestUtils.isProcessAlive(jmProcess));
            if (jobManagerRef == null) {
                if (lastError != null) {
                    lastError.printStackTrace();
                }
                Assert.fail((String)("JobManager process did not launch the JobManager properly. Failed to look up JobManager actor at localhost:" + jobManagerPort));
            }
            jobManagerRef.tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
            long now = System.currentTimeMillis();
            long deadline = now + 5000L;
            while (now < deadline && CommonTestUtils.isProcessAlive(jmProcess)) {
                Thread.sleep(100L);
                now = System.currentTimeMillis();
            }
            Assert.assertFalse((String)"JobManager process did not terminate upon actor death", (boolean)CommonTestUtils.isProcessAlive(jmProcess));
            int returnCode = jmProcess.exitValue();
            Assert.assertEquals((String)"JobManager died, but not because of the process reaper", (long)JobManager.RUNTIME_FAILURE_RETURN_CODE(), (long)returnCode);
        }
        catch (Exception e) {
            e.printStackTrace();
            JobManagerProcessReapingTest.printProcessLog(processOutput.toString());
            Assert.fail((String)e.getMessage());
        }
        catch (Error e) {
            e.printStackTrace();
            JobManagerProcessReapingTest.printProcessLog(processOutput.toString());
            throw e;
        }
        finally {
            if (jmProcess != null) {
                jmProcess.destroy();
            }
            if (localSystem != null) {
                localSystem.terminate();
            }
        }
    }

    private static void printProcessLog(String log) {
        System.out.println("-----------------------------------------");
        System.out.println("       BEGIN SPAWNED PROCESS LOG");
        System.out.println("-----------------------------------------");
        System.out.println(log);
        System.out.println("-----------------------------------------");
        System.out.println("        END SPAWNED PROCESS LOG");
        System.out.println("-----------------------------------------");
    }

    private static class PipeForwarder
    extends Thread {
        private final StringWriter target;
        private final InputStream source;

        public PipeForwarder(InputStream source, StringWriter target) {
            super("Pipe Forwarder");
            this.setDaemon(true);
            this.source = source;
            this.target = target;
            this.start();
        }

        @Override
        public void run() {
            try {
                int next;
                while ((next = this.source.read()) != -1) {
                    this.target.write(next);
                }
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }

    public static class JobManagerTestEntryPoint {
        public static void main(String[] args) {
            try {
                Configuration config = new Configuration();
                config.setInteger(WebOptions.PORT, -1);
                JobManager.runJobManager((Configuration)config, (JobManagerMode)JobManagerMode.CLUSTER, (String)"localhost", (int)0);
                System.exit(0);
            }
            catch (Throwable t) {
                System.exit(1);
            }
        }
    }
}

