/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc;

import java.io.Serializable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.util.Preconditions;

public class TestingRpcService
extends AkkaRpcService {
    private final ConcurrentHashMap<String, RpcGateway> registeredConnections = new ConcurrentHashMap();

    public TestingRpcService() {
        this(new Configuration());
    }

    public TestingRpcService(Configuration configuration) {
        super(AkkaUtils.createLocalActorSystem((Configuration)configuration), Time.seconds((long)10L));
    }

    public void stopService() {
        super.stopService();
        this.registeredConnections.clear();
    }

    public void registerGateway(String address, RpcGateway gateway) {
        Preconditions.checkNotNull((Object)address);
        Preconditions.checkNotNull((Object)gateway);
        if (this.registeredConnections.putIfAbsent(address, gateway) != null) {
            throw new IllegalStateException("a gateway is already registered under " + address);
        }
    }

    public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz) {
        RpcGateway gateway = this.registeredConnections.get(address);
        if (gateway != null) {
            if (clazz.isAssignableFrom(gateway.getClass())) {
                RpcGateway typedGateway = gateway;
                return CompletableFuture.completedFuture(typedGateway);
            }
            return FutureUtils.completedExceptionally((Throwable)new Exception("Gateway registered under " + address + " is not of type " + clazz));
        }
        return super.connect(address, clazz);
    }

    public <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(String address, F fencingToken, Class<C> clazz) {
        RpcGateway gateway = this.registeredConnections.get(address);
        if (gateway != null) {
            if (clazz.isAssignableFrom(gateway.getClass())) {
                FencedRpcGateway typedGateway = (FencedRpcGateway)gateway;
                return CompletableFuture.completedFuture(typedGateway);
            }
            return FutureUtils.completedExceptionally((Throwable)new Exception("Gateway registered under " + address + " is not of type " + clazz));
        }
        return super.connect(address, fencingToken, clazz);
    }

    public void clearGateways() {
        this.registeredConnections.clear();
    }
}

