/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.blob;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobServerConnection;
import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.BlobUtils;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.blob.TransientBlobCleanupTask;
import org.apache.flink.runtime.blob.TransientBlobKey;
import org.apache.flink.runtime.blob.TransientBlobService;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BlobServer
extends Thread
implements BlobService,
BlobWriter,
PermanentBlobService,
TransientBlobService {
    private static final Logger LOG = LoggerFactory.getLogger(BlobServer.class);
    private final AtomicLong tempFileCounter = new AtomicLong(0L);
    private final ServerSocket serverSocket;
    private final SSLContext serverSSLContext;
    private final Configuration blobServiceConfiguration;
    private final AtomicBoolean shutdownRequested = new AtomicBoolean();
    private final File storageDir;
    private final BlobStore blobStore;
    private final Set<BlobServerConnection> activeConnections = new HashSet<BlobServerConnection>();
    private final int maxConnections;
    private final ReadWriteLock readWriteLock;
    private final Thread shutdownHook;
    private final ConcurrentHashMap<Tuple2<JobID, TransientBlobKey>, Long> blobExpiryTimes = new ConcurrentHashMap();
    private final long cleanupInterval;
    private final Timer cleanupTimer;

    public BlobServer(Configuration config, BlobStore blobStore) throws IOException {
        this.blobServiceConfiguration = (Configuration)Preconditions.checkNotNull((Object)config);
        this.blobStore = (BlobStore)Preconditions.checkNotNull((Object)blobStore);
        this.readWriteLock = new ReentrantReadWriteLock();
        String storageDirectory = config.getString(BlobServerOptions.STORAGE_DIRECTORY);
        this.storageDir = BlobUtils.initLocalStorageDirectory(storageDirectory);
        LOG.info("Created BLOB server storage directory {}", (Object)this.storageDir);
        int maxConnections = config.getInteger(BlobServerOptions.FETCH_CONCURRENT);
        if (maxConnections >= 1) {
            this.maxConnections = maxConnections;
        } else {
            LOG.warn("Invalid value for maximum connections in BLOB server: {}. Using default value of {}", (Object)maxConnections, BlobServerOptions.FETCH_CONCURRENT.defaultValue());
            this.maxConnections = (Integer)BlobServerOptions.FETCH_CONCURRENT.defaultValue();
        }
        int backlog = config.getInteger(BlobServerOptions.FETCH_BACKLOG);
        if (backlog < 1) {
            LOG.warn("Invalid value for BLOB connection backlog: {}. Using default value of {}", (Object)backlog, BlobServerOptions.FETCH_BACKLOG.defaultValue());
            backlog = (Integer)BlobServerOptions.FETCH_BACKLOG.defaultValue();
        }
        this.cleanupTimer = new Timer(true);
        this.cleanupInterval = config.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000L;
        this.cleanupTimer.schedule((TimerTask)new TransientBlobCleanupTask(this.blobExpiryTimes, this.readWriteLock.writeLock(), this.storageDir, LOG), this.cleanupInterval, this.cleanupInterval);
        this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
        if (config.getBoolean(BlobServerOptions.SSL_ENABLED)) {
            try {
                this.serverSSLContext = SSLUtils.createSSLServerContext(config);
            }
            catch (Exception e) {
                throw new IOException("Failed to initialize SSLContext for the blob server", e);
            }
        } else {
            this.serverSSLContext = null;
        }
        String serverPortRange = config.getString(BlobServerOptions.PORT);
        Iterator ports = NetUtils.getPortRangeFromString((String)serverPortRange);
        final int finalBacklog = backlog;
        ServerSocket socketAttempt = NetUtils.createSocketFromPorts((Iterator)ports, (NetUtils.SocketFactory)new NetUtils.SocketFactory(){

            public ServerSocket createSocket(int port) throws IOException {
                if (BlobServer.this.serverSSLContext == null) {
                    return new ServerSocket(port, finalBacklog);
                }
                LOG.info("Enabling ssl for the blob server");
                return BlobServer.this.serverSSLContext.getServerSocketFactory().createServerSocket(port, finalBacklog);
            }
        });
        if (socketAttempt == null) {
            throw new IOException("Unable to allocate socket for blob server in specified port range: " + serverPortRange);
        }
        SSLUtils.setSSLVerAndCipherSuites(socketAttempt, config);
        this.serverSocket = socketAttempt;
        this.setName("BLOB Server listener at " + this.getPort());
        this.setDaemon(true);
        if (LOG.isInfoEnabled()) {
            LOG.info("Started BLOB server at {}:{} - max concurrent requests: {} - max backlog: {}", new Object[]{this.serverSocket.getInetAddress().getHostAddress(), this.getPort(), maxConnections, backlog});
        }
    }

    @VisibleForTesting
    public File getStorageLocation(@Nullable JobID jobId, BlobKey key) throws IOException {
        return BlobUtils.getStorageLocation(this.storageDir, jobId, key);
    }

    File createTemporaryFilename() throws IOException {
        return new File(BlobUtils.getIncomingDirectory(this.storageDir), String.format("temp-%08d", this.tempFileCounter.getAndIncrement()));
    }

    ReadWriteLock getReadWriteLock() {
        return this.readWriteLock;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void run() {
        try {
            while (!this.shutdownRequested.get()) {
                Set<BlobServerConnection> set;
                BlobServerConnection conn = new BlobServerConnection(this.serverSocket.accept(), this);
                try {
                    set = this.activeConnections;
                    synchronized (set) {
                        while (this.activeConnections.size() >= this.maxConnections) {
                            this.activeConnections.wait(2000L);
                        }
                        this.activeConnections.add(conn);
                    }
                    conn.start();
                    conn = null;
                }
                finally {
                    if (conn == null) continue;
                    conn.close();
                    set = this.activeConnections;
                    synchronized (set) {
                        this.activeConnections.remove(conn);
                    }
                }
            }
            return;
        }
        catch (Throwable t) {
            if (this.shutdownRequested.get()) return;
            LOG.error("BLOB server stopped working. Shutting down", t);
            try {
                this.close();
                return;
            }
            catch (Throwable closeThrowable) {
                LOG.error("Could not properly close the BlobServer.", closeThrowable);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException {
        this.cleanupTimer.cancel();
        if (this.shutdownRequested.compareAndSet(false, true)) {
            Exception exception = null;
            try {
                this.serverSocket.close();
            }
            catch (IOException ioe) {
                exception = ioe;
            }
            this.interrupt();
            try {
                this.join();
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                LOG.debug("Error while waiting for this thread to die.", (Throwable)ie);
            }
            Set<BlobServerConnection> ie = this.activeConnections;
            synchronized (ie) {
                if (!this.activeConnections.isEmpty()) {
                    for (BlobServerConnection conn : this.activeConnections) {
                        LOG.debug("Shutting down connection {}.", (Object)conn.getName());
                        conn.close();
                    }
                    this.activeConnections.clear();
                }
            }
            try {
                FileUtils.deleteDirectory((File)this.storageDir);
            }
            catch (IOException e) {
                exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
            }
            if (this.shutdownHook != null && this.shutdownHook != Thread.currentThread()) {
                try {
                    Runtime.getRuntime().removeShutdownHook(this.shutdownHook);
                }
                catch (IllegalStateException e) {
                }
                catch (Throwable t) {
                    LOG.warn("Exception while unregistering BLOB server's cleanup shutdown hook.", t);
                }
            }
            if (LOG.isInfoEnabled()) {
                LOG.info("Stopped BLOB server at {}:{}", (Object)this.serverSocket.getInetAddress().getHostAddress(), (Object)this.getPort());
            }
            ExceptionUtils.tryRethrowIOException((Throwable)exception);
        }
    }

    protected BlobClient createClient() throws IOException {
        return new BlobClient(new InetSocketAddress(this.serverSocket.getInetAddress(), this.getPort()), this.blobServiceConfiguration);
    }

    @Override
    public File getFile(TransientBlobKey key) throws IOException {
        return this.getFileInternal(null, key);
    }

    @Override
    public File getFile(JobID jobId, TransientBlobKey key) throws IOException {
        Preconditions.checkNotNull((Object)jobId);
        return this.getFileInternal(jobId, key);
    }

    @Override
    public File getFile(JobID jobId, PermanentBlobKey key) throws IOException {
        Preconditions.checkNotNull((Object)jobId);
        return this.getFileInternal(jobId, key);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private File getFileInternal(@Nullable JobID jobId, BlobKey blobKey) throws IOException {
        Preconditions.checkArgument((blobKey != null ? 1 : 0) != 0, (Object)"BLOB key cannot be null.");
        File localFile = BlobUtils.getStorageLocation(this.storageDir, jobId, blobKey);
        this.readWriteLock.readLock().lock();
        try {
            this.getFileInternal(jobId, blobKey, localFile);
            File file = localFile;
            return file;
        }
        finally {
            this.readWriteLock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void getFileInternal(@Nullable JobID jobId, BlobKey blobKey, File localFile) throws IOException {
        if (localFile.exists()) {
            if (blobKey instanceof TransientBlobKey) {
                this.blobExpiryTimes.put((Tuple2<JobID, TransientBlobKey>)Tuple2.of((Object)jobId, (Object)((TransientBlobKey)blobKey)), System.currentTimeMillis() + this.cleanupInterval);
            }
            return;
        }
        if (blobKey instanceof PermanentBlobKey) {
            block10: {
                this.readWriteLock.readLock().unlock();
                File incomingFile = null;
                try {
                    incomingFile = this.createTemporaryFilename();
                    this.blobStore.get(jobId, blobKey, incomingFile);
                    this.readWriteLock.writeLock().lock();
                    try {
                        BlobUtils.moveTempFileToStore(incomingFile, jobId, blobKey, localFile, LOG, null);
                    }
                    finally {
                        this.readWriteLock.writeLock().unlock();
                    }
                    if (incomingFile == null || incomingFile.delete() || !incomingFile.exists()) break block10;
                }
                catch (Throwable throwable) {
                    if (incomingFile != null && !incomingFile.delete() && incomingFile.exists()) {
                        LOG.warn("Could not delete the staging file {} for blob key {} and job {}.", new Object[]{incomingFile, blobKey, jobId});
                    }
                    this.readWriteLock.readLock().lock();
                    throw throwable;
                }
                LOG.warn("Could not delete the staging file {} for blob key {} and job {}.", new Object[]{incomingFile, blobKey, jobId});
            }
            this.readWriteLock.readLock().lock();
            return;
        }
        throw new FileNotFoundException("Local file " + localFile + " does not exist and failed to copy from blob store.");
    }

    @Override
    public TransientBlobKey putTransient(byte[] value) throws IOException {
        return (TransientBlobKey)this.putBuffer(null, value, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Override
    public TransientBlobKey putTransient(JobID jobId, byte[] value) throws IOException {
        Preconditions.checkNotNull((Object)jobId);
        return (TransientBlobKey)this.putBuffer(jobId, value, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Override
    public TransientBlobKey putTransient(InputStream inputStream) throws IOException {
        return (TransientBlobKey)this.putInputStream(null, inputStream, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Override
    public TransientBlobKey putTransient(JobID jobId, InputStream inputStream) throws IOException {
        Preconditions.checkNotNull((Object)jobId);
        return (TransientBlobKey)this.putInputStream(jobId, inputStream, BlobKey.BlobType.TRANSIENT_BLOB);
    }

    @Override
    public PermanentBlobKey putPermanent(JobID jobId, byte[] value) throws IOException {
        Preconditions.checkNotNull((Object)jobId);
        return (PermanentBlobKey)this.putBuffer(jobId, value, BlobKey.BlobType.PERMANENT_BLOB);
    }

    @Override
    public PermanentBlobKey putPermanent(JobID jobId, InputStream inputStream) throws IOException {
        Preconditions.checkNotNull((Object)jobId);
        return (PermanentBlobKey)this.putInputStream(jobId, inputStream, BlobKey.BlobType.PERMANENT_BLOB);
    }

    /*
     * Loose catch block
     */
    private BlobKey putBuffer(@Nullable JobID jobId, byte[] value, BlobKey.BlobType blobType) throws IOException {
        BlobKey blobKey;
        Throwable throwable;
        FileOutputStream fos;
        BlobKey blobKey2;
        File incomingFile;
        block17: {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received PUT call for BLOB of job {}.", (Object)jobId);
            }
            incomingFile = this.createTemporaryFilename();
            MessageDigest md = BlobUtils.createMessageDigest();
            blobKey2 = null;
            fos = new FileOutputStream(incomingFile);
            throwable = null;
            md.update(value);
            fos.write(value);
            blobKey = blobKey2 = this.moveTempFileToStore(incomingFile, jobId, md.digest(), blobType);
            if (incomingFile.delete() || !incomingFile.exists()) break block17;
            LOG.warn("Could not delete the staging file {} for blob key {} and job {}.", new Object[]{incomingFile, blobKey2, jobId});
        }
        return blobKey;
        {
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (fos != null) {
                    if (throwable != null) {
                        try {
                            fos.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                    } else {
                        fos.close();
                    }
                }
            }
            {
                catch (Throwable throwable4) {
                    if (!incomingFile.delete() && incomingFile.exists()) {
                        LOG.warn("Could not delete the staging file {} for blob key {} and job {}.", new Object[]{incomingFile, blobKey2, jobId});
                    }
                    throw throwable4;
                }
            }
        }
    }

    /*
     * Loose catch block
     */
    private BlobKey putInputStream(@Nullable JobID jobId, InputStream inputStream, BlobKey.BlobType blobType) throws IOException {
        BlobKey blobKey;
        Throwable throwable;
        FileOutputStream fos;
        BlobKey blobKey2;
        File incomingFile;
        block18: {
            int bytesRead;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received PUT call for BLOB of job {}.", (Object)jobId);
            }
            incomingFile = this.createTemporaryFilename();
            MessageDigest md = BlobUtils.createMessageDigest();
            blobKey2 = null;
            fos = new FileOutputStream(incomingFile);
            throwable = null;
            byte[] buf = new byte[65536];
            while ((bytesRead = inputStream.read(buf)) != -1) {
                fos.write(buf, 0, bytesRead);
                md.update(buf, 0, bytesRead);
            }
            blobKey = blobKey2 = this.moveTempFileToStore(incomingFile, jobId, md.digest(), blobType);
            if (incomingFile.delete() || !incomingFile.exists()) break block18;
            LOG.warn("Could not delete the staging file {} for blob key {} and job {}.", new Object[]{incomingFile, blobKey2, jobId});
        }
        return blobKey;
        {
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (fos != null) {
                    if (throwable != null) {
                        try {
                            fos.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                    } else {
                        fos.close();
                    }
                }
            }
            {
                catch (Throwable throwable4) {
                    if (!incomingFile.delete() && incomingFile.exists()) {
                        LOG.warn("Could not delete the staging file {} for blob key {} and job {}.", new Object[]{incomingFile, blobKey2, jobId});
                    }
                    throw throwable4;
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    BlobKey moveTempFileToStore(File incomingFile, @Nullable JobID jobId, byte[] digest, BlobKey.BlobType blobType) throws IOException {
        int retries = 10;
        int attempt = 0;
        while (true) {
            BlobKey blobKey = BlobKey.createKey(blobType, digest);
            File storageFile = BlobUtils.getStorageLocation(this.storageDir, jobId, blobKey);
            this.readWriteLock.writeLock().lock();
            try {
                if (!storageFile.exists()) {
                    BlobUtils.moveTempFileToStore(incomingFile, jobId, blobKey, storageFile, LOG, blobKey instanceof PermanentBlobKey ? this.blobStore : null);
                    if (blobKey instanceof TransientBlobKey) {
                        this.blobExpiryTimes.put((Tuple2<JobID, TransientBlobKey>)Tuple2.of((Object)jobId, (Object)((TransientBlobKey)blobKey)), System.currentTimeMillis() + this.cleanupInterval);
                    }
                    BlobKey blobKey2 = blobKey;
                    return blobKey2;
                }
            }
            finally {
                this.readWriteLock.writeLock().unlock();
            }
            if (++attempt >= retries) {
                String message = "Failed to find a unique key for BLOB of job " + jobId + " (last tried " + storageFile.getAbsolutePath() + ".";
                LOG.error(message + " No retries left.");
                throw new IOException(message);
            }
            if (!LOG.isDebugEnabled()) continue;
            LOG.debug("Trying to find a unique key for BLOB of job {} (retry {}, last tried {})", new Object[]{jobId, attempt, storageFile.getAbsolutePath()});
        }
    }

    @Override
    public boolean deleteFromCache(TransientBlobKey key) {
        return this.deleteInternal(null, key);
    }

    @Override
    public boolean deleteFromCache(JobID jobId, TransientBlobKey key) {
        Preconditions.checkNotNull((Object)jobId);
        return this.deleteInternal(jobId, key);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean deleteInternal(@Nullable JobID jobId, TransientBlobKey key) {
        File localFile = new File(BlobUtils.getStorageLocationPath(this.storageDir.getAbsolutePath(), jobId, key));
        this.readWriteLock.writeLock().lock();
        try {
            if (!localFile.delete() && localFile.exists()) {
                LOG.warn("Failed to locally delete BLOB " + key + " at " + localFile.getAbsolutePath());
                boolean bl = false;
                return bl;
            }
            this.blobExpiryTimes.remove(Tuple2.of((Object)jobId, (Object)key));
            boolean bl = true;
            return bl;
        }
        finally {
            this.readWriteLock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean cleanupJob(JobID jobId) {
        Preconditions.checkNotNull((Object)jobId);
        File jobDir = new File(BlobUtils.getStorageLocationPath(this.storageDir.getAbsolutePath(), jobId));
        this.readWriteLock.writeLock().lock();
        try {
            boolean deletedLocally = false;
            try {
                FileUtils.deleteDirectory((File)jobDir);
                deletedLocally = true;
            }
            catch (IOException e) {
                LOG.warn("Failed to locally delete BLOB storage directory at " + jobDir.getAbsolutePath(), (Throwable)e);
            }
            boolean deletedHA = this.blobStore.deleteAll(jobId);
            boolean bl = deletedLocally && deletedHA;
            return bl;
        }
        finally {
            this.readWriteLock.writeLock().unlock();
        }
    }

    @Override
    public PermanentBlobService getPermanentBlobService() {
        return this;
    }

    @Override
    public TransientBlobService getTransientBlobService() {
        return this;
    }

    @Override
    public final int getMinOffloadingSize() {
        return this.blobServiceConfiguration.getInteger(BlobServerOptions.OFFLOAD_MINSIZE);
    }

    @Override
    public int getPort() {
        return this.serverSocket.getLocalPort();
    }

    @VisibleForTesting
    ConcurrentMap<Tuple2<JobID, TransientBlobKey>, Long> getBlobExpiryTimes() {
        return this.blobExpiryTimes;
    }

    public boolean isShutdown() {
        return this.shutdownRequested.get();
    }

    ServerSocket getServerSocket() {
        return this.serverSocket;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void unregisterConnection(BlobServerConnection conn) {
        Set<BlobServerConnection> set = this.activeConnections;
        synchronized (set) {
            this.activeConnections.remove(conn);
            this.activeConnections.notifyAll();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    List<BlobServerConnection> getCurrentActiveConnections() {
        Set<BlobServerConnection> set = this.activeConnections;
        synchronized (set) {
            return new ArrayList<BlobServerConnection>(this.activeConnections);
        }
    }
}

