/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network;

import java.io.IOException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.query.KvStateClientProxy;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateServer;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NetworkEnvironment {
    private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class);
    private final Object lock = new Object();
    private final NetworkBufferPool networkBufferPool;
    private final ConnectionManager connectionManager;
    private final ResultPartitionManager resultPartitionManager;
    private final TaskEventDispatcher taskEventDispatcher;
    private KvStateServer kvStateServer;
    private KvStateClientProxy kvStateProxy;
    private final KvStateRegistry kvStateRegistry;
    private final IOManager.IOMode defaultIOMode;
    private final int partitionRequestInitialBackoff;
    private final int partitionRequestMaxBackoff;
    private final int networkBuffersPerChannel;
    private final int extraNetworkBuffersPerGate;
    private boolean isShutdown;

    public NetworkEnvironment(NetworkBufferPool networkBufferPool, ConnectionManager connectionManager, ResultPartitionManager resultPartitionManager, TaskEventDispatcher taskEventDispatcher, KvStateRegistry kvStateRegistry, KvStateServer kvStateServer, KvStateClientProxy kvStateClientProxy, IOManager.IOMode defaultIOMode, int partitionRequestInitialBackoff, int partitionRequestMaxBackoff, int networkBuffersPerChannel, int extraNetworkBuffersPerGate) {
        this.networkBufferPool = (NetworkBufferPool)Preconditions.checkNotNull((Object)networkBufferPool);
        this.connectionManager = (ConnectionManager)Preconditions.checkNotNull((Object)connectionManager);
        this.resultPartitionManager = (ResultPartitionManager)Preconditions.checkNotNull((Object)resultPartitionManager);
        this.taskEventDispatcher = (TaskEventDispatcher)Preconditions.checkNotNull((Object)taskEventDispatcher);
        this.kvStateRegistry = (KvStateRegistry)Preconditions.checkNotNull((Object)kvStateRegistry);
        this.kvStateServer = kvStateServer;
        this.kvStateProxy = kvStateClientProxy;
        this.defaultIOMode = defaultIOMode;
        this.partitionRequestInitialBackoff = partitionRequestInitialBackoff;
        this.partitionRequestMaxBackoff = partitionRequestMaxBackoff;
        this.isShutdown = false;
        this.networkBuffersPerChannel = networkBuffersPerChannel;
        this.extraNetworkBuffersPerGate = extraNetworkBuffersPerGate;
    }

    public ResultPartitionManager getResultPartitionManager() {
        return this.resultPartitionManager;
    }

    public TaskEventDispatcher getTaskEventDispatcher() {
        return this.taskEventDispatcher;
    }

    public ConnectionManager getConnectionManager() {
        return this.connectionManager;
    }

    public NetworkBufferPool getNetworkBufferPool() {
        return this.networkBufferPool;
    }

    public IOManager.IOMode getDefaultIOMode() {
        return this.defaultIOMode;
    }

    public int getPartitionRequestInitialBackoff() {
        return this.partitionRequestInitialBackoff;
    }

    public int getPartitionRequestMaxBackoff() {
        return this.partitionRequestMaxBackoff;
    }

    public KvStateRegistry getKvStateRegistry() {
        return this.kvStateRegistry;
    }

    public KvStateServer getKvStateServer() {
        return this.kvStateServer;
    }

    public KvStateClientProxy getKvStateProxy() {
        return this.kvStateProxy;
    }

    public TaskKvStateRegistry createKvStateTaskRegistry(JobID jobId, JobVertexID jobVertexId) {
        return this.kvStateRegistry.createTaskRegistry(jobId, jobVertexId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerTask(Task task) throws IOException {
        ResultPartition[] producedPartitions = task.getProducedPartitions();
        ResultPartitionWriter[] writers = task.getAllWriters();
        if (writers.length != producedPartitions.length) {
            throw new IllegalStateException("Unequal number of writers and partitions.");
        }
        Object object = this.lock;
        synchronized (object) {
            SingleInputGate[] inputGates;
            if (this.isShutdown) {
                throw new IllegalStateException("NetworkEnvironment is shut down");
            }
            for (int i = 0; i < producedPartitions.length; ++i) {
                ResultPartition partition = producedPartitions[i];
                ResultPartitionWriter writer = writers[i];
                BufferPool bufferPool = null;
                try {
                    int maxNumberOfMemorySegments = partition.getPartitionType().isBounded() ? partition.getNumberOfSubpartitions() * this.networkBuffersPerChannel + this.extraNetworkBuffersPerGate : Integer.MAX_VALUE;
                    bufferPool = this.networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(), maxNumberOfMemorySegments);
                    partition.registerBufferPool(bufferPool);
                    this.resultPartitionManager.registerResultPartition(partition);
                }
                catch (Throwable t) {
                    if (bufferPool != null) {
                        bufferPool.lazyDestroy();
                    }
                    if (t instanceof IOException) {
                        throw (IOException)t;
                    }
                    throw new IOException(t.getMessage(), t);
                }
                this.taskEventDispatcher.registerWriterForIncomingTaskEvents(writer.getPartitionId(), writer);
            }
            for (SingleInputGate gate : inputGates = task.getAllInputGates()) {
                BufferPool bufferPool = null;
                try {
                    if (gate.getConsumedPartitionType().isCreditBased()) {
                        bufferPool = this.networkBufferPool.createBufferPool(this.extraNetworkBuffersPerGate, this.extraNetworkBuffersPerGate);
                        gate.assignExclusiveSegments(this.networkBufferPool, this.networkBuffersPerChannel);
                    } else {
                        int maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ? gate.getNumberOfInputChannels() * this.networkBuffersPerChannel + this.extraNetworkBuffersPerGate : Integer.MAX_VALUE;
                        bufferPool = this.networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(), maxNumberOfMemorySegments);
                    }
                    gate.setBufferPool(bufferPool);
                }
                catch (Throwable t) {
                    if (bufferPool != null) {
                        bufferPool.lazyDestroy();
                    }
                    ExceptionUtils.rethrowIOException((Throwable)t);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unregisterTask(Task task) {
        LOG.debug("Unregister task {} from network environment (state: {}).", (Object)task.getTaskInfo().getTaskNameWithSubtasks(), (Object)task.getExecutionState());
        ExecutionAttemptID executionId = task.getExecutionId();
        Object object = this.lock;
        synchronized (object) {
            SingleInputGate[] inputGates;
            ResultPartition[] partitions;
            ResultPartitionWriter[] writers;
            if (this.isShutdown) {
                return;
            }
            if (task.isCanceledOrFailed()) {
                this.resultPartitionManager.releasePartitionsProducedBy(executionId, task.getFailureCause());
            }
            if ((writers = task.getAllWriters()) != null) {
                for (ResultPartitionWriter writer : writers) {
                    this.taskEventDispatcher.unregisterWriter(writer);
                }
            }
            if ((partitions = task.getProducedPartitions()) != null) {
                for (ResultPartition partition : partitions) {
                    partition.destroyBufferPool();
                }
            }
            if ((inputGates = task.getAllInputGates()) != null) {
                for (SingleInputGate gate : inputGates) {
                    try {
                        if (gate == null) continue;
                        gate.releaseAllResources();
                    }
                    catch (IOException e) {
                        LOG.error("Error during release of reader resources: " + e.getMessage(), (Throwable)e);
                    }
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() throws IOException {
        Object object = this.lock;
        synchronized (object) {
            Preconditions.checkState((!this.isShutdown ? 1 : 0) != 0, (Object)"The NetworkEnvironment has already been shut down.");
            LOG.info("Starting the network environment and its components.");
            try {
                LOG.debug("Starting network connection manager");
                this.connectionManager.start(this.resultPartitionManager, this.taskEventDispatcher);
            }
            catch (IOException t) {
                throw new IOException("Failed to instantiate network connection manager.", t);
            }
            if (this.kvStateServer != null) {
                try {
                    this.kvStateServer.start();
                }
                catch (Throwable ie) {
                    this.kvStateServer.shutdown();
                    this.kvStateServer = null;
                    throw new IOException("Failed to start the Queryable State Data Server.", ie);
                }
            }
            if (this.kvStateProxy != null) {
                try {
                    this.kvStateProxy.start();
                }
                catch (Throwable ie) {
                    this.kvStateProxy.shutdown();
                    this.kvStateProxy = null;
                    throw new IOException("Failed to start the Queryable State Client Proxy.", ie);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        Object object = this.lock;
        synchronized (object) {
            if (this.isShutdown) {
                return;
            }
            LOG.info("Shutting down the network environment and its components.");
            if (this.kvStateProxy != null) {
                try {
                    LOG.debug("Shutting down Queryable State Client Proxy.");
                    this.kvStateProxy.shutdown();
                }
                catch (Throwable t) {
                    LOG.warn("Cannot shut down Queryable State Client Proxy.", t);
                }
            }
            if (this.kvStateServer != null) {
                try {
                    LOG.debug("Shutting down Queryable State Data Server.");
                    this.kvStateServer.shutdown();
                }
                catch (Throwable t) {
                    LOG.warn("Cannot shut down Queryable State Data Server.", t);
                }
            }
            try {
                LOG.debug("Shutting down network connection manager");
                this.connectionManager.shutdown();
            }
            catch (Throwable t) {
                LOG.warn("Cannot shut down the network connection manager.", t);
            }
            try {
                LOG.debug("Shutting down intermediate result partition manager");
                this.resultPartitionManager.shutdown();
            }
            catch (Throwable t) {
                LOG.warn("Cannot shut down the result partition manager.", t);
            }
            this.taskEventDispatcher.clearAll();
            this.networkBufferPool.destroyAllBufferPools();
            try {
                this.networkBufferPool.destroy();
            }
            catch (Throwable t) {
                LOG.warn("Network buffer pool did not shut down properly.", t);
            }
            this.isShutdown = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isShutdown() {
        Object object = this.lock;
        synchronized (object) {
            return this.isShutdown;
        }
    }
}

