/*
 * Decompiled with CFR 0.152.
 */
package kafka.common;

import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import kafka.common.InterBrokerSendThread$;
import kafka.common.RequestAndCompletionHandler;
import kafka.common.UnsentRequests;
import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.internals.FatalExitError;
import org.apache.kafka.common.utils.Time;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.immutable.Nil$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;

@ScalaSignature(bytes="\u0006\u0001\u0005%c!B\u0001\u0003\u0003\u00039!!F%oi\u0016\u0014(I]8lKJ\u001cVM\u001c3UQJ,\u0017\r\u001a\u0006\u0003\u0007\u0011\taaY8n[>t'\"A\u0003\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001\u0001\u0003\t\u0003\u00131i\u0011A\u0003\u0006\u0003\u0017\u0011\tQ!\u001e;jYNL!!\u0004\u0006\u0003%MCW\u000f\u001e3po:\f'\r\\3UQJ,\u0017\r\u001a\u0005\n\u001f\u0001\u0011\t\u0011)A\u0005!i\tAA\\1nKB\u0011\u0011c\u0006\b\u0003%Ui\u0011a\u0005\u0006\u0002)\u0005)1oY1mC&\u0011acE\u0001\u0007!J,G-\u001a4\n\u0005aI\"AB*ue&twM\u0003\u0002\u0017'%\u0011q\u0002\u0004\u0005\t9\u0001\u0011\t\u0011)A\u0005;\u0005ia.\u001a;x_J\\7\t\\5f]R\u0004\"A\b\u0014\u000e\u0003}Q!\u0001I\u0011\u0002\u000f\rd\u0017.\u001a8ug*\u0011QA\t\u0006\u0003G\u0011\na!\u00199bG\",'\"A\u0013\u0002\u0007=\u0014x-\u0003\u0002(?\tia*\u001a;x_J\\7\t\\5f]RD\u0001\"\u000b\u0001\u0003\u0002\u0003\u0006IAK\u0001\u0005i&lW\r\u0005\u0002,]5\tAF\u0003\u0002\f[)\u00111!I\u0005\u0003_1\u0012A\u0001V5nK\"I\u0011\u0007\u0001B\u0001B\u0003%!'N\u0001\u0010SNLe\u000e^3seV\u0004H/\u001b2mKB\u0011!cM\u0005\u0003iM\u0011qAQ8pY\u0016\fg.\u0003\u00022\u0019!)q\u0007\u0001C\u0001q\u00051A(\u001b8jiz\"R!O\u001e={y\u0002\"A\u000f\u0001\u000e\u0003\tAQa\u0004\u001cA\u0002AAQ\u0001\b\u001cA\u0002uAQ!\u000b\u001cA\u0002)Bq!\r\u001c\u0011\u0002\u0003\u0007!\u0007C\u0003A\u0001\u0019\u0005\u0011)\u0001\thK:,'/\u0019;f%\u0016\fX/Z:ugR\t!\tE\u0002D\u0017:s!\u0001R%\u000f\u0005\u0015CU\"\u0001$\u000b\u0005\u001d3\u0011A\u0002\u001fs_>$h(C\u0001\u0015\u0013\tQ5#A\u0004qC\u000e\\\u0017mZ3\n\u00051k%\u0001C%uKJ\f'\r\\3\u000b\u0005)\u001b\u0002C\u0001\u001eP\u0013\t\u0001&AA\u000eSKF,Xm\u001d;B]\u0012\u001cu.\u001c9mKRLwN\u001c%b]\u0012dWM\u001d\u0005\u0006%\u00021\taU\u0001\u0011e\u0016\fX/Z:u)&lWm\\;u\u001bN,\u0012\u0001\u0016\t\u0003%UK!AV\n\u0003\u0007%sG\u000fC\u0004Y\u0001\t\u0007I\u0011B-\u0002\u001dUt7/\u001a8u%\u0016\fX/Z:ugV\t!\f\u0005\u0002;7&\u0011AL\u0001\u0002\u000f+:\u001cXM\u001c;SKF,Xm\u001d;t\u0011\u0019q\u0006\u0001)A\u00055\u0006yQO\\:f]R\u0014V-];fgR\u001c\b\u0005C\u0003a\u0001\u0011\u0005\u0011-A\tiCN,fn]3oiJ+\u0017/^3tiN,\u0012A\r\u0005\u0006G\u0002!\t\u0005Z\u0001\tg\",H\u000fZ8x]R\tQ\r\u0005\u0002\u0013M&\u0011qm\u0005\u0002\u0005+:LG\u000fC\u0003j\u0001\u0011\u0005C-\u0001\u0004e_^{'o\u001b\u0005\u0006W\u0002!I\u0001\\\u0001\rg\u0016tGMU3rk\u0016\u001cHo\u001d\u000b\u0003[B\u0004\"A\u00058\n\u0005=\u001c\"\u0001\u0002'p]\u001eDQ!\u001d6A\u00025\f1A\\8x\u0011\u0015\u0019\b\u0001\"\u0003u\u0003A\u0019\u0007.Z2l\t&\u001c8m\u001c8oK\u000e$8\u000f\u0006\u0002fk\")\u0011O\u001da\u0001[\")q\u000f\u0001C\u0005q\u0006\u0019b-Y5m\u000bb\u0004\u0018N]3e%\u0016\fX/Z:ugR\u0011Q-\u001f\u0005\u0006cZ\u0004\r!\u001c\u0005\u0006w\u0002!\t\u0001`\u0001\u0017G>l\u0007\u000f\\3uK^KG\u000f\u001b#jg\u000e|gN\\3diR1Q-`A\u0003\u0003\u000fAQA >A\u0002}\fqA]3rk\u0016\u001cH\u000fE\u0002\u001f\u0003\u0003I1!a\u0001 \u00055\u0019E.[3oiJ+\u0017/^3ti\")\u0011O\u001fa\u0001[\"9\u0011\u0011\u0002>A\u0002\u0005-\u0011aF1vi\",g\u000e^5dCRLwN\\#yG\u0016\u0004H/[8o!\u0011\ti!a\u0005\u000e\u0005\u0005=!bAA\t[\u00051QM\u001d:peNLA!!\u0006\u0002\u0010\t9\u0012)\u001e;iK:$\u0018nY1uS>tW\t_2faRLwN\u001c\u0005\u0007\u00033\u0001A\u0011\u00013\u0002\r]\f7.Z;q\u000f%\tiBAA\u0001\u0012\u0003\ty\"A\u000bJ]R,'O\u0011:pW\u0016\u00148+\u001a8e)\"\u0014X-\u00193\u0011\u0007i\n\tC\u0002\u0005\u0002\u0005\u0005\u0005\t\u0012AA\u0012'\u0011\t\t#!\n\u0011\u0007I\t9#C\u0002\u0002*M\u0011a!\u00118z%\u00164\u0007bB\u001c\u0002\"\u0011\u0005\u0011Q\u0006\u000b\u0003\u0003?A!\"!\r\u0002\"E\u0005I\u0011AA\u001a\u0003m!C.Z:tS:LG\u000fJ4sK\u0006$XM\u001d\u0013eK\u001a\fW\u000f\u001c;%iU\u0011\u0011Q\u0007\u0016\u0004e\u0005]2FAA\u001d!\u0011\tY$!\u0012\u000e\u0005\u0005u\"\u0002BA \u0003\u0003\n\u0011\"\u001e8dQ\u0016\u001c7.\u001a3\u000b\u0007\u0005\r3#\u0001\u0006b]:|G/\u0019;j_:LA!a\u0012\u0002>\t\tRO\\2iK\u000e\\W\r\u001a,be&\fgnY3")
public abstract class InterBrokerSendThread
extends ShutdownableThread {
    public final NetworkClient kafka$common$InterBrokerSendThread$$networkClient;
    private final Time time;
    private final UnsentRequests kafka$common$InterBrokerSendThread$$unsentRequests;

    public static boolean $lessinit$greater$default$4() {
        return InterBrokerSendThread$.MODULE$.$lessinit$greater$default$4();
    }

    public abstract Iterable<RequestAndCompletionHandler> generateRequests();

    public abstract int requestTimeoutMs();

    public UnsentRequests kafka$common$InterBrokerSendThread$$unsentRequests() {
        return this.kafka$common$InterBrokerSendThread$$unsentRequests;
    }

    public boolean hasUnsentRequests() {
        return this.kafka$common$InterBrokerSendThread$$unsentRequests().iterator().hasNext();
    }

    @Override
    public void shutdown() {
        this.initiateShutdown();
        this.kafka$common$InterBrokerSendThread$$networkClient.wakeup();
        this.awaitShutdown();
    }

    @Override
    public void doWork() {
        LongRef now = LongRef.create((long)this.time.milliseconds());
        this.generateRequests().foreach((Function1)new Serializable(this, now){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ InterBrokerSendThread $outer;
            private final LongRef now$1;

            public final void apply(RequestAndCompletionHandler request) {
                RequestCompletionHandler completionHandler = request.handler();
                this.$outer.kafka$common$InterBrokerSendThread$$unsentRequests().put(request.destination(), this.$outer.kafka$common$InterBrokerSendThread$$networkClient.newClientRequest(request.destination().idString(), request.request(), this.now$1.elem, true, this.$outer.requestTimeoutMs(), completionHandler));
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.now$1 = now$1;
            }
        });
        try {
            long timeout = this.sendRequests(now.elem);
            this.kafka$common$InterBrokerSendThread$$networkClient.poll(timeout, now.elem);
            now.elem = this.time.milliseconds();
            this.checkDisconnects(now.elem);
            this.failExpiredRequests(now.elem);
            this.kafka$common$InterBrokerSendThread$$unsentRequests().clean();
            return;
        }
        catch (Throwable throwable) {
            this.error((Function0<String>)new Serializable(this){
                public static final long serialVersionUID = 0L;

                public final String apply() {
                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"unhandled exception caught in InterBrokerSendThread"})).s((Seq)Nil$.MODULE$);
                }
            }, (Function0<Throwable>)new Serializable(this, throwable){
                public static final long serialVersionUID = 0L;
                private final Throwable t$1;

                public final Throwable apply() {
                    return this.t$1;
                }
                {
                    this.t$1 = t$1;
                }
            });
            throw new FatalExitError();
        }
        catch (FatalExitError fatalExitError) {
            throw fatalExitError;
        }
    }

    private long sendRequests(long now) {
        LongRef pollTimeout = LongRef.create((long)Long.MAX_VALUE);
        ((IterableLike)JavaConverters$.MODULE$.asScalaSetConverter(this.kafka$common$InterBrokerSendThread$$unsentRequests().nodes()).asScala()).foreach((Function1)new Serializable(this, now, pollTimeout){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ InterBrokerSendThread $outer;
            private final long now$2;
            private final LongRef pollTimeout$1;

            public final void apply(Node node2) {
                Iterator<ClientRequest> requestIterator = this.$outer.kafka$common$InterBrokerSendThread$$unsentRequests().requestIterator(node2);
                while (requestIterator.hasNext()) {
                    ClientRequest request = requestIterator.next();
                    if (this.$outer.kafka$common$InterBrokerSendThread$$networkClient.ready(node2, this.now$2)) {
                        this.$outer.kafka$common$InterBrokerSendThread$$networkClient.send(request, this.now$2);
                        requestIterator.remove();
                        continue;
                    }
                    this.pollTimeout$1.elem = Math.min(this.pollTimeout$1.elem, this.$outer.kafka$common$InterBrokerSendThread$$networkClient.connectionDelay(node2, this.now$2));
                }
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.now$2 = now$2;
                this.pollTimeout$1 = pollTimeout$1;
            }
        });
        return pollTimeout.elem;
    }

    private void checkDisconnects(long now) {
        Iterator<Map.Entry<Node, ArrayDeque<ClientRequest>>> iterator2 = this.kafka$common$InterBrokerSendThread$$unsentRequests().iterator();
        while (iterator2.hasNext()) {
            Map.Entry<Node, ArrayDeque<ClientRequest>> entry2 = iterator2.next();
            Tuple2 tuple2 = new Tuple2((Object)entry2.getKey(), entry2.getValue());
            if (tuple2 != null) {
                Tuple2 tuple22;
                Node node2 = (Node)tuple2._1();
                ArrayDeque requests = (ArrayDeque)tuple2._2();
                Tuple2 tuple23 = tuple22 = new Tuple2((Object)node2, (Object)requests);
                Node node3 = (Node)tuple23._1();
                ArrayDeque requests2 = (ArrayDeque)tuple23._2();
                if (requests2.isEmpty() || !this.kafka$common$InterBrokerSendThread$$networkClient.connectionFailed(node3)) continue;
                iterator2.remove();
                ((IterableLike)JavaConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)requests2).asScala()).foreach((Function1)new Serializable(this, now, node3){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ InterBrokerSendThread $outer;
                    private final long now$3;
                    private final Node node$1;

                    public final void apply(ClientRequest request) {
                        AuthenticationException authenticationException = this.$outer.kafka$common$InterBrokerSendThread$$networkClient.authenticationException(this.node$1);
                        if (authenticationException != null) {
                            this.$outer.error((Function0<String>)new Serializable(this, request){
                                public static final long serialVersionUID = 0L;
                                private final ClientRequest request$1;

                                public final String apply() {
                                    return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Failed to send the following request due to authentication error: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.request$1}));
                                }
                                {
                                    this.request$1 = request$1;
                                }
                            });
                        }
                        this.$outer.completeWithDisconnect(request, this.now$3, authenticationException);
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.now$3 = now$3;
                        this.node$1 = node$1;
                    }
                });
                continue;
            }
            throw new MatchError((Object)tuple2);
        }
    }

    private void failExpiredRequests(long now) {
        Collection<ClientRequest> timedOutRequests = this.kafka$common$InterBrokerSendThread$$unsentRequests().removeAllTimedOut(now);
        ((IterableLike)JavaConverters$.MODULE$.collectionAsScalaIterableConverter(timedOutRequests).asScala()).foreach((Function1)new Serializable(this, now){
            public static final long serialVersionUID = 0L;
            private final /* synthetic */ InterBrokerSendThread $outer;
            private final long now$4;

            public final void apply(ClientRequest request) {
                this.$outer.debug((Function0<String>)new Serializable(this, request){
                    public static final long serialVersionUID = 0L;
                    private final ClientRequest request$2;

                    public final String apply() {
                        return new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Failed to send the following request after ", " ms: ", ""})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.request$2.requestTimeoutMs()), this.request$2}));
                    }
                    {
                        this.request$2 = request$2;
                    }
                });
                this.$outer.completeWithDisconnect(request, this.now$4, null);
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.now$4 = now$4;
            }
        });
    }

    public void completeWithDisconnect(ClientRequest request, long now, AuthenticationException authenticationException) {
        RequestCompletionHandler handler = request.callback();
        handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().latestAllowedVersion()), handler, request.destination(), now, now, true, null, authenticationException, null));
    }

    public void wakeup() {
        this.kafka$common$InterBrokerSendThread$$networkClient.wakeup();
    }

    public InterBrokerSendThread(String name, NetworkClient networkClient, Time time, boolean isInterruptible) {
        this.kafka$common$InterBrokerSendThread$$networkClient = networkClient;
        this.time = time;
        super(name, isInterruptible);
        this.kafka$common$InterBrokerSendThread$$unsentRequests = new UnsentRequests();
    }
}

