/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.akka;

import java.io.Serializable;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.akka.AkkaRpcActor;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.rpc.messages.FencedMessage;
import org.apache.flink.runtime.rpc.messages.LocalFencedMessage;
import org.apache.flink.runtime.rpc.messages.UnfencedMessage;

public class FencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpoint<F>>
extends AkkaRpcActor<T> {
    public FencedAkkaRpcActor(T rpcEndpoint, CompletableFuture<Boolean> terminationFuture, int version) {
        super(rpcEndpoint, terminationFuture, version);
    }

    @Override
    protected void handleRpcMessage(Object message) {
        if (message instanceof FencedMessage) {
            Object expectedFencingToken = ((FencedRpcEndpoint)this.rpcEndpoint).getFencingToken();
            if (expectedFencingToken == null) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Fencing token not set: Ignoring message {} because the fencing token is null.", message);
                }
                this.sendErrorIfSender(new FencingTokenException(String.format("Fencing token not set: Ignoring message %s sent to %s because the fencing token is null.", message, ((FencedRpcEndpoint)this.rpcEndpoint).getAddress())));
            } else {
                FencedMessage fencedMessage = (FencedMessage)message;
                Object fencingToken = fencedMessage.getFencingToken();
                if (Objects.equals(expectedFencingToken, fencingToken)) {
                    super.handleRpcMessage(fencedMessage.getPayload());
                } else {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Fencing token mismatch: Ignoring message {} because the fencing token {} did not match the expected fencing token {}.", new Object[]{message, fencingToken, expectedFencingToken});
                    }
                    this.sendErrorIfSender(new FencingTokenException("Fencing token mismatch: Ignoring message " + message + " because the fencing token " + fencingToken + " did not match the expected fencing token " + expectedFencingToken + '.'));
                }
            }
        } else if (message instanceof UnfencedMessage) {
            super.handleRpcMessage(((UnfencedMessage)message).getPayload());
        } else {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Unknown message type: Ignoring message {} because it is neither of type {} nor {}.", new Object[]{message, FencedMessage.class.getSimpleName(), UnfencedMessage.class.getSimpleName()});
            }
            this.sendErrorIfSender(new AkkaUnknownMessageException("Unknown message type: Ignoring message " + message + " of type " + message.getClass().getSimpleName() + " because it is neither of type " + FencedMessage.class.getSimpleName() + " nor " + UnfencedMessage.class.getSimpleName() + '.'));
        }
    }

    @Override
    protected Object envelopeSelfMessage(Object message) {
        Object fencingToken = ((FencedRpcEndpoint)this.rpcEndpoint).getFencingToken();
        return new LocalFencedMessage(fencingToken, message);
    }
}

