package reactor.rx.broadcast;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.dispatch.SynchronousDispatcher;
import reactor.core.queue.CompletableQueue;
import reactor.core.support.Assert;
import reactor.core.support.Exceptions;
import reactor.fn.Consumer;
import reactor.rx.action.Action;
import reactor.rx.subscription.PushSubscription;
import reactor.rx.subscription.ReactiveSubscription;

/* loaded from: input_file:reactor/rx/broadcast/Broadcaster.class */
public class Broadcaster<O> extends Action<O, O> {
    public static final Subscription HOT_SUBSCRIPTION = new PushSubscription(null, null) { // from class: reactor.rx.broadcast.Broadcaster.1
        @Override // reactor.rx.subscription.PushSubscription, org.reactivestreams.Subscription
        public void request(long j) {
        }

        @Override // reactor.rx.subscription.PushSubscription, org.reactivestreams.Subscription
        public void cancel() {
        }
    };
    protected final Dispatcher dispatcher;
    protected final Environment environment;

    public static <T> Broadcaster<T> create() {
        return new Broadcaster<>(null, SynchronousDispatcher.INSTANCE, Long.MAX_VALUE);
    }

    public static <T> Broadcaster<T> create(Environment environment) {
        return create(environment, environment.getDefaultDispatcher());
    }

    public static <T> Broadcaster<T> create(Dispatcher dispatcher) {
        return create(null, dispatcher);
    }

    public static <T> Broadcaster<T> create(Environment environment, Dispatcher dispatcher) {
        Assert.state(dispatcher.supportsOrdering(), "Dispatcher provided doesn't support event ordering.  For concurrent consume, refer to Stream#partition/groupBy() method and assign individual single dispatchers");
        return new Broadcaster<>(environment, dispatcher, Action.evaluateCapacity(dispatcher.backlogSize()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Broadcaster(Environment environment, Dispatcher dispatcher, long j) {
        super(j);
        this.dispatcher = dispatcher;
        this.environment = environment;
        this.upstreamSubscription = (PushSubscription) HOT_SUBSCRIPTION;
    }

    @Override // reactor.rx.Stream
    public final Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // reactor.rx.action.Action
    public void doNext(O o) {
        broadcastNext(o);
    }

    @Override // reactor.rx.action.Action, org.reactivestreams.Subscriber
    public void onNext(O o) {
        if (o == null) {
            throw new NullPointerException("Spec 2.13: Signal cannot be null");
        }
        if (this.dispatcher.inContext()) {
            super.onNext(o);
        } else {
            this.dispatcher.dispatch(o, this, null);
        }
    }

    @Override // reactor.rx.action.Action, org.reactivestreams.Subscriber
    public void onSubscribe(Subscription subscription) {
        if (this.upstreamSubscription != HOT_SUBSCRIPTION) {
            super.onSubscribe(subscription);
            return;
        }
        this.upstreamSubscription = null;
        super.onSubscribe(subscription);
        PushSubscription<O> pushSubscription = this.downstreamSubscription;
        if (pushSubscription == null || pushSubscription.pendingRequestSignals() <= 0) {
            return;
        }
        subscription.request(pushSubscription.pendingRequestSignals());
    }

    @Override // reactor.rx.action.Action, org.reactivestreams.Subscriber
    public void onError(Throwable th) {
        if (th == null) {
            throw new NullPointerException("Spec 2.13: Signal cannot be null");
        }
        if (this.dispatcher.inContext()) {
            super.onError(th);
        } else {
            this.dispatcher.dispatch(th, new Consumer<Throwable>() { // from class: reactor.rx.broadcast.Broadcaster.2
                @Override // reactor.fn.Consumer
                public void accept(Throwable th2) {
                    Broadcaster.super.doError(th2);
                }
            }, null);
        }
    }

    @Override // reactor.rx.action.Action, org.reactivestreams.Subscriber
    public void onComplete() {
        if (this.dispatcher.inContext()) {
            super.onComplete();
        } else {
            this.dispatcher.dispatch(null, new Consumer<Void>() { // from class: reactor.rx.broadcast.Broadcaster.3
                @Override // reactor.fn.Consumer
                public void accept(Void r3) {
                    Broadcaster.super.onComplete();
                }
            }, null);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // reactor.rx.action.Action
    public PushSubscription<O> createSubscription(Subscriber<? super O> subscriber, CompletableQueue<O> completableQueue) {
        return completableQueue != null ? new ReactiveSubscription<O>(this, subscriber, completableQueue) { // from class: reactor.rx.broadcast.Broadcaster.4
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // reactor.rx.subscription.PushSubscription
            public void onRequest(long j) {
                if (Broadcaster.this.upstreamSubscription != null) {
                    super.onRequest(j);
                    Broadcaster.this.requestUpstream(Broadcaster.this.capacity, this.buffer.isComplete(), j);
                }
            }
        } : super.createSubscription(subscriber, (CompletableQueue) null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // reactor.rx.action.Action
    public PushSubscription<O> createSubscription(Subscriber<? super O> subscriber, boolean z) {
        if (z) {
            return super.createSubscription((Subscriber) subscriber, true);
        }
        return super.createSubscription(subscriber, (this.dispatcher == SynchronousDispatcher.INSTANCE || this.upstreamSubscription == null || this.upstreamSubscription.hasPublisher()) ? false : true);
    }

    @Override // reactor.rx.action.Action
    protected void subscribeWithSubscription(Subscriber<? super O> subscriber, PushSubscription<O> pushSubscription) {
        try {
            if (addSubscription(pushSubscription)) {
                subscriber.onSubscribe(pushSubscription);
            } else {
                subscriber.onError(new IllegalStateException("The subscription cannot be linked to this Stream"));
            }
        } catch (Exception e) {
            Exceptions.throwIfFatal(e);
            subscriber.onError(e);
        }
    }

    @Override // reactor.rx.action.Action, reactor.rx.action.Control
    public void cancel() {
        if (this.upstreamSubscription != HOT_SUBSCRIPTION) {
            super.cancel();
        }
    }

    @Override // reactor.rx.action.Action, reactor.core.support.Recyclable
    public void recycle() {
        if (HOT_SUBSCRIPTION != this.upstreamSubscription) {
            this.upstreamSubscription = null;
        } else {
            this.downstreamSubscription = null;
        }
    }

    @Override // reactor.rx.action.Action, reactor.rx.Stream
    public Broadcaster<O> capacity(long j) {
        super.capacity(j);
        return this;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // reactor.rx.action.Action
    public void requestUpstream(long j, boolean z, long j2) {
        if (this.upstreamSubscription != null && this.upstreamSubscription != HOT_SUBSCRIPTION && !z) {
            requestMore(j2);
            return;
        }
        PushSubscription<O> pushSubscription = this.downstreamSubscription;
        if (pushSubscription == null || pushSubscription.pendingRequestSignals() != 0) {
            return;
        }
        pushSubscription.updatePendingRequests(j2);
    }
}
