/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorRef;
import akka.actor.Status;
import akka.actor.UntypedActor;
import akka.pattern.Patterns;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaHandshakeException;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException;
import org.apache.flink.runtime.rpc.akka.messages.Processing;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.runtime.rpc.messages.CallAsync;
import org.apache.flink.runtime.rpc.messages.HandshakeSuccessMessage;
import org.apache.flink.runtime.rpc.messages.RemoteHandshakeMessage;
import org.apache.flink.runtime.rpc.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.messages.RunAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;

class AkkaRpcActor<T extends RpcEndpoint>
extends UntypedActor {
    protected final Logger log = LoggerFactory.getLogger(((Object)((Object)this)).getClass());
    protected final T rpcEndpoint;
    private final MainThreadValidatorUtil mainThreadValidator;
    private final CompletableFuture<Boolean> terminationFuture;
    private final int version;
    private State state;

    AkkaRpcActor(T rpcEndpoint, CompletableFuture<Boolean> terminationFuture, int version) {
        this.rpcEndpoint = (RpcEndpoint)Preconditions.checkNotNull(rpcEndpoint, (String)"rpc endpoint");
        this.mainThreadValidator = new MainThreadValidatorUtil((RpcEndpoint)rpcEndpoint);
        this.terminationFuture = (CompletableFuture)Preconditions.checkNotNull(terminationFuture);
        this.version = version;
        this.state = State.STOPPED;
    }

    public void postStop() throws Exception {
        this.mainThreadValidator.enterMainThread();
        try {
            CompletableFuture<Void> postStopFuture;
            try {
                postStopFuture = ((RpcEndpoint)this.rpcEndpoint).postStop();
            }
            catch (Throwable throwable2) {
                postStopFuture = FutureUtils.completedExceptionally(throwable2);
            }
            super.postStop();
            postStopFuture.whenComplete((value, throwable) -> {
                if (throwable != null) {
                    this.terminationFuture.completeExceptionally((Throwable)throwable);
                } else {
                    this.terminationFuture.complete(null);
                }
            });
        }
        finally {
            this.mainThreadValidator.exitMainThread();
        }
    }

    public void onReceive(Object message) {
        if (message instanceof RemoteHandshakeMessage) {
            this.handleHandshakeMessage((RemoteHandshakeMessage)message);
        } else if (message.equals((Object)Processing.START)) {
            this.state = State.STARTED;
        } else if (message.equals((Object)Processing.STOP)) {
            this.state = State.STOPPED;
        } else if (this.state == State.STARTED) {
            this.mainThreadValidator.enterMainThread();
            try {
                this.handleRpcMessage(message);
            }
            finally {
                this.mainThreadValidator.exitMainThread();
            }
        } else {
            this.log.info("The rpc endpoint {} has not been started yet. Discarding message {} until processing is started.", (Object)this.rpcEndpoint.getClass().getName(), (Object)message.getClass().getName());
            this.sendErrorIfSender(new AkkaRpcException(String.format("Discard message, because the rpc endpoint %s has not been started yet.", ((RpcEndpoint)this.rpcEndpoint).getAddress())));
        }
    }

    protected void handleRpcMessage(Object message) {
        if (message instanceof RunAsync) {
            this.handleRunAsync((RunAsync)message);
        } else if (message instanceof CallAsync) {
            this.handleCallAsync((CallAsync)message);
        } else if (message instanceof RpcInvocation) {
            this.handleRpcInvocation((RpcInvocation)message);
        } else {
            this.log.warn("Received message of unknown type {} with value {}. Dropping this message!", (Object)message.getClass().getName(), message);
            this.sendErrorIfSender(new AkkaUnknownMessageException("Received unknown message " + message + " of type " + message.getClass().getSimpleName() + '.'));
        }
    }

    private void handleHandshakeMessage(RemoteHandshakeMessage handshakeMessage) {
        if (!this.isCompatibleVersion(handshakeMessage.getVersion())) {
            this.sendErrorIfSender(new AkkaHandshakeException(String.format("Version mismatch between source (%s) and target (%s) rpc component. Please verify that all components have the same version.", handshakeMessage.getVersion(), this.getVersion())));
        } else if (!this.isGatewaySupported(handshakeMessage.getRpcGateway())) {
            this.sendErrorIfSender(new AkkaHandshakeException(String.format("The rpc endpoint does not support the gateway %s.", handshakeMessage.getRpcGateway().getSimpleName())));
        } else {
            this.getSender().tell((Object)new Status.Success((Object)HandshakeSuccessMessage.INSTANCE), this.getSelf());
        }
    }

    private boolean isGatewaySupported(Class<?> rpcGateway) {
        return rpcGateway.isAssignableFrom(this.rpcEndpoint.getClass());
    }

    private boolean isCompatibleVersion(int sourceVersion) {
        return sourceVersion == this.getVersion();
    }

    private int getVersion() {
        return this.version;
    }

    private void handleRpcInvocation(RpcInvocation rpcInvocation) {
        block12: {
            RpcConnectionException rpcException;
            Method rpcMethod = null;
            try {
                String methodName = rpcInvocation.getMethodName();
                Class<?>[] parameterTypes = rpcInvocation.getParameterTypes();
                rpcMethod = this.lookupRpcMethod(methodName, parameterTypes);
            }
            catch (ClassNotFoundException e) {
                this.log.error("Could not load method arguments.", (Throwable)e);
                rpcException = new RpcConnectionException("Could not load method arguments.", e);
                this.getSender().tell((Object)new Status.Failure((Throwable)rpcException), this.getSelf());
            }
            catch (IOException e) {
                this.log.error("Could not deserialize rpc invocation message.", (Throwable)e);
                rpcException = new RpcConnectionException("Could not deserialize rpc invocation message.", e);
                this.getSender().tell((Object)new Status.Failure((Throwable)rpcException), this.getSelf());
            }
            catch (NoSuchMethodException e) {
                this.log.error("Could not find rpc method for rpc invocation.", (Throwable)e);
                rpcException = new RpcConnectionException("Could not find rpc method for rpc invocation.", e);
                this.getSender().tell((Object)new Status.Failure((Throwable)rpcException), this.getSelf());
            }
            if (rpcMethod != null) {
                try {
                    Object result;
                    rpcMethod.setAccessible(true);
                    if (rpcMethod.getReturnType().equals(Void.TYPE)) {
                        rpcMethod.invoke(this.rpcEndpoint, rpcInvocation.getArgs());
                        break block12;
                    }
                    try {
                        result = rpcMethod.invoke(this.rpcEndpoint, rpcInvocation.getArgs());
                    }
                    catch (InvocationTargetException e) {
                        this.log.trace("Reporting back error thrown in remote procedure {}", (Object)rpcMethod, (Object)e);
                        this.getSender().tell((Object)new Status.Failure(e.getTargetException()), this.getSelf());
                        return;
                    }
                    if (result instanceof CompletableFuture) {
                        CompletableFuture future = (CompletableFuture)result;
                        Promise.DefaultPromise promise = new Promise.DefaultPromise();
                        future.whenComplete((value, throwable) -> {
                            if (throwable != null) {
                                promise.failure(throwable);
                            } else {
                                promise.success(value);
                            }
                        });
                        Patterns.pipe((Future)promise.future(), (ExecutionContext)this.getContext().dispatcher()).to(this.getSender());
                    } else {
                        this.getSender().tell((Object)new Status.Success(result), this.getSelf());
                    }
                }
                catch (Throwable e) {
                    this.log.error("Error while executing remote procedure call {}.", (Object)rpcMethod, (Object)e);
                    this.getSender().tell((Object)new Status.Failure(e), this.getSelf());
                }
            }
        }
    }

    private void handleCallAsync(CallAsync callAsync) {
        if (callAsync.getCallable() == null) {
            String result = "Received a " + callAsync.getClass().getName() + " message with an empty callable field. This indicates that this message has been serialized prior to sending the message. The " + callAsync.getClass().getName() + " is only supported with local communication.";
            this.log.warn(result);
            this.getSender().tell((Object)new Status.Failure((Throwable)new AkkaRpcException(result)), this.getSelf());
        } else {
            try {
                Object result = callAsync.getCallable().call();
                this.getSender().tell((Object)new Status.Success(result), this.getSelf());
            }
            catch (Throwable e) {
                this.getSender().tell((Object)new Status.Failure(e), this.getSelf());
            }
        }
    }

    private void handleRunAsync(RunAsync runAsync) {
        if (runAsync.getRunnable() == null) {
            this.log.warn("Received a {} message with an empty runnable field. This indicates that this message has been serialized prior to sending the message. The {} is only supported with local communication.", (Object)runAsync.getClass().getName(), (Object)runAsync.getClass().getName());
        } else {
            long delayNanos;
            long timeToRun = runAsync.getTimeNanos();
            if (timeToRun == 0L || (delayNanos = timeToRun - System.nanoTime()) <= 0L) {
                try {
                    runAsync.getRunnable().run();
                }
                catch (Throwable t) {
                    this.log.error("Caught exception while executing runnable in main thread.", t);
                    ExceptionUtils.rethrowIfFatalErrorOrOOM((Throwable)t);
                }
            } else {
                FiniteDuration delay = new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS);
                RunAsync message = new RunAsync(runAsync.getRunnable(), timeToRun);
                Object envelopedSelfMessage = this.envelopeSelfMessage(message);
                this.getContext().system().scheduler().scheduleOnce(delay, this.getSelf(), envelopedSelfMessage, (ExecutionContext)this.getContext().dispatcher(), ActorRef.noSender());
            }
        }
    }

    private Method lookupRpcMethod(String methodName, Class<?>[] parameterTypes) throws NoSuchMethodException {
        return this.rpcEndpoint.getClass().getMethod(methodName, parameterTypes);
    }

    protected void sendErrorIfSender(Throwable throwable) {
        if (!this.getSender().equals((Object)ActorRef.noSender())) {
            this.getSender().tell((Object)new Status.Failure(throwable), this.getSelf());
        }
    }

    protected Object envelopeSelfMessage(Object message) {
        return message;
    }

    static enum State {
        STARTED,
        STOPPED;

    }
}

