package _ss_com.streamsets.datacollector.event.handler.remote;

import _ss_com.com.google.common.annotations.VisibleForTesting;
import _ss_com.com.google.common.base.Splitter;
import _ss_com.com.google.common.base.Stopwatch;
import _ss_com.com.google.common.base.Strings;
import _ss_com.com.google.common.collect.Lists;
import _ss_com.fasterxml.jackson.core.JsonProcessingException;
import _ss_com.fasterxml.jackson.core.type.TypeReference;
import _ss_com.streamsets.datacollector.config.StageDefinition;
import _ss_com.streamsets.datacollector.config.dto.PipelineConfigAndRules;
import _ss_com.streamsets.datacollector.event.binding.MessagingJsonToFromDto;
import _ss_com.streamsets.datacollector.event.client.api.EventClient;
import _ss_com.streamsets.datacollector.event.client.api.EventException;
import _ss_com.streamsets.datacollector.event.dto.AckEvent;
import _ss_com.streamsets.datacollector.event.dto.AckEventStatus;
import _ss_com.streamsets.datacollector.event.dto.BlobDeleteEvent;
import _ss_com.streamsets.datacollector.event.dto.BlobDeleteVersionEvent;
import _ss_com.streamsets.datacollector.event.dto.BlobStoreEvent;
import _ss_com.streamsets.datacollector.event.dto.ClientEvent;
import _ss_com.streamsets.datacollector.event.dto.DisconnectedSsoCredentialsEvent;
import _ss_com.streamsets.datacollector.event.dto.Event;
import _ss_com.streamsets.datacollector.event.dto.EventType;
import _ss_com.streamsets.datacollector.event.dto.PingFrequencyAdjustmentEvent;
import _ss_com.streamsets.datacollector.event.dto.PipelineBaseEvent;
import _ss_com.streamsets.datacollector.event.dto.PipelineSaveEvent;
import _ss_com.streamsets.datacollector.event.dto.PipelineSaveRulesEvent;
import _ss_com.streamsets.datacollector.event.dto.PipelineStartEvent;
import _ss_com.streamsets.datacollector.event.dto.PipelineStatusEvent;
import _ss_com.streamsets.datacollector.event.dto.PipelineStatusEvents;
import _ss_com.streamsets.datacollector.event.dto.PipelineStopAndDeleteEvent;
import _ss_com.streamsets.datacollector.event.dto.SDCBuildInfo;
import _ss_com.streamsets.datacollector.event.dto.SDCInfoEvent;
import _ss_com.streamsets.datacollector.event.dto.SDCProcessMetricsEvent;
import _ss_com.streamsets.datacollector.event.dto.SaveConfigurationEvent;
import _ss_com.streamsets.datacollector.event.dto.ServerEvent;
import _ss_com.streamsets.datacollector.event.dto.StageInfo;
import _ss_com.streamsets.datacollector.event.dto.SyncAclEvent;
import _ss_com.streamsets.datacollector.event.handler.DataCollector;
import _ss_com.streamsets.datacollector.event.handler.EventHandlerTask;
import _ss_com.streamsets.datacollector.event.json.ServerEventJson;
import _ss_com.streamsets.datacollector.execution.StartPipelineContextBuilder;
import _ss_com.streamsets.datacollector.io.DataStore;
import _ss_com.streamsets.datacollector.json.ObjectMapperFactory;
import _ss_com.streamsets.datacollector.main.DataCollectorBuildInfo;
import _ss_com.streamsets.datacollector.main.RuntimeInfo;
import _ss_com.streamsets.datacollector.restapi.bean.BeanHelper;
import _ss_com.streamsets.datacollector.restapi.bean.PipelineConfigurationJson;
import _ss_com.streamsets.datacollector.restapi.bean.RuleDefinitionsJson;
import _ss_com.streamsets.datacollector.restapi.bean.SourceOffsetJson;
import _ss_com.streamsets.datacollector.runner.production.SourceOffset;
import _ss_com.streamsets.datacollector.runner.production.SourceOffsetUpgrader;
import _ss_com.streamsets.datacollector.stagelibrary.StageLibraryTask;
import _ss_com.streamsets.datacollector.task.AbstractTask;
import _ss_com.streamsets.datacollector.util.Configuration;
import _ss_com.streamsets.datacollector.util.DisconnectedSecurityUtils;
import _ss_com.streamsets.lib.security.http.DisconnectedSSOManager;
import _ss_com.streamsets.lib.security.http.SSOConstants;
import _ss_com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService;
import com.streamsets.pipeline.api.impl.Utils;
import com.sun.management.OperatingSystemMXBean;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.jetbrains.annotations.Nullable;
import org.ldaptive.pool.PoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:_ss_com/streamsets/datacollector/event/handler/remote/RemoteEventHandlerTask.class */
public class RemoteEventHandlerTask extends AbstractTask implements EventHandlerTask {
    private static final Logger LOG = LoggerFactory.getLogger(RemoteEventHandlerTask.class);
    private static final long DEFAULT_PING_FREQUENCY = 5000;
    private static final long SYSTEM_LIMIT_MIN_PING_FREQUENCY = 5000;
    private static final long DEFAULT_STATUS_EVENTS_INTERVAL = 60000;
    private static final long SYSTEM_LIMIT_MIN_STATUS_EVENTS_INTERVAL = 30000;
    private static final String REMOTE_CONTROL = "dpm.remote.control.";
    public static final String REMOTE_JOB_LABELS = "dpm.remote.control.job.labels";
    private static final String REMOTE_URL_PING_INTERVAL = "dpm.remote.control.ping.frequency";
    private static final String REMOTE_URL_SEND_ALL_STATUS_EVENTS_INTERVAL = "dpm.remote.control.status.events.interval";
    private static final String DEFAULT_REMOTE_JOB_LABELS = "all";
    private static final String REMOTE_CONTROL_EVENTS_RECIPIENT = "dpm.remote.control.events.recipient";
    private static final String DEFAULT_REMOTE_CONTROL_EVENTS_RECIPIENT = "jobrunner-app";
    private static final String REMOTE_CONTROL_PROCESS_EVENTS_RECIPIENTS = "dpm.remote.control.process.events.recipients";
    private static final String DEFAULT_REMOTE_CONTROL_PROCESS_EVENTS_RECIPIENTS = "jobrunner-app,timeseries-app";
    public static final String OFFSET = "offset";
    public static final int OFFSET_PROTOCOL_VERSION = 2;
    private final RemoteDataCollector remoteDataCollector;
    private final EventClient eventSenderReceiver;
    private final MessagingJsonToFromDto jsonToFromDto;
    private final SafeScheduledExecutorService executorService;
    private final StageLibraryTask stageLibrary;
    private final RuntimeInfo runtimeInfo;
    private final List<String> appDestinationList;
    private final List<String> processAppDestinationList;
    private final List<String> labelList;
    private final Map<String, String> requestHeader;
    private final long defaultPingFrequency;
    private final Stopwatch stopWatch;
    private final long sendAllStatusEventsInterval;
    private final DataStore dataStore;

    @VisibleForTesting
    /* loaded from: input_file:_ss_com/streamsets/datacollector/event/handler/remote/RemoteEventHandlerTask$EventHandlerCallable.class */
    static class EventHandlerCallable implements Callable<Void> {
        private final DataCollector remoteDataCollector;
        private final EventClient eventClient;
        private final MessagingJsonToFromDto jsonToFromDto;
        private final SafeScheduledExecutorService executorService;
        private final Map<String, String> requestHeader;
        private final List<String> jobEventDestinationList;
        private final List<String> processAppDestinationList;
        private final Stopwatch stopWatch;
        private final long waitBetweenSendingStatusEvents;
        private final DataStore disconnectedCredentialsDataStore;
        private List<ClientEvent> ackEventList;
        private List<ClientEvent> remoteEventList;
        private ClientEvent sdcInfoEvent;
        private long delay;
        private Map<ServerEvent, Future<AckEvent>> eventToAckEventFuture;
        private RuntimeInfo runtimeInfo;

        public EventHandlerCallable(DataCollector dataCollector, EventClient eventClient, MessagingJsonToFromDto messagingJsonToFromDto, List<ClientEvent> list, List<ClientEvent> list2, ClientEvent clientEvent, SafeScheduledExecutorService safeScheduledExecutorService, long j, List<String> list3, List<String> list4, Map<String, String> map, Stopwatch stopwatch, long j2, DataStore dataStore, Map<ServerEvent, Future<AckEvent>> map2, RuntimeInfo runtimeInfo) {
            this.remoteDataCollector = dataCollector;
            this.eventClient = eventClient;
            this.jsonToFromDto = messagingJsonToFromDto;
            this.executorService = safeScheduledExecutorService;
            this.delay = j;
            this.jobEventDestinationList = list3;
            this.processAppDestinationList = list4;
            this.ackEventList = list;
            this.remoteEventList = list2;
            this.sdcInfoEvent = clientEvent;
            this.requestHeader = map;
            this.stopWatch = stopwatch;
            this.waitBetweenSendingStatusEvents = j2;
            this.disconnectedCredentialsDataStore = dataStore;
            this.eventToAckEventFuture = map2;
            this.runtimeInfo = runtimeInfo;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() {
            try {
                try {
                    callRemoteControl();
                    this.executorService.schedule(new EventHandlerCallable(this.remoteDataCollector, this.eventClient, this.jsonToFromDto, this.ackEventList, this.remoteEventList, this.sdcInfoEvent, this.executorService, this.delay, this.jobEventDestinationList, this.processAppDestinationList, this.requestHeader, this.stopWatch, this.waitBetweenSendingStatusEvents, this.disconnectedCredentialsDataStore, this.eventToAckEventFuture, this.runtimeInfo), this.delay, TimeUnit.MILLISECONDS);
                    return null;
                } catch (Exception e) {
                    RemoteEventHandlerTask.LOG.warn("Cannot connect to send/receive events: {}", e.toString());
                    RemoteEventHandlerTask.LOG.trace("Entire error message", e);
                    this.executorService.schedule(new EventHandlerCallable(this.remoteDataCollector, this.eventClient, this.jsonToFromDto, this.ackEventList, this.remoteEventList, this.sdcInfoEvent, this.executorService, this.delay, this.jobEventDestinationList, this.processAppDestinationList, this.requestHeader, this.stopWatch, this.waitBetweenSendingStatusEvents, this.disconnectedCredentialsDataStore, this.eventToAckEventFuture, this.runtimeInfo), this.delay, TimeUnit.MILLISECONDS);
                    return null;
                }
            } catch (Throwable th) {
                this.executorService.schedule(new EventHandlerCallable(this.remoteDataCollector, this.eventClient, this.jsonToFromDto, this.ackEventList, this.remoteEventList, this.sdcInfoEvent, this.executorService, this.delay, this.jobEventDestinationList, this.processAppDestinationList, this.requestHeader, this.stopWatch, this.waitBetweenSendingStatusEvents, this.disconnectedCredentialsDataStore, this.eventToAckEventFuture, this.runtimeInfo), this.delay, TimeUnit.MILLISECONDS);
                throw th;
            }
        }

        @VisibleForTesting
        long getDelay() {
            return this.delay;
        }

        @VisibleForTesting
        List<ClientEvent> getAckEventList() {
            return this.ackEventList;
        }

        private PipelineStatusEvent createPipelineStatusEvent(MessagingJsonToFromDto messagingJsonToFromDto, PipelineAndValidationStatus pipelineAndValidationStatus) throws JsonProcessingException {
            return new PipelineStatusEvent(pipelineAndValidationStatus.getName(), pipelineAndValidationStatus.getTitle(), pipelineAndValidationStatus.getRev(), pipelineAndValidationStatus.getTimeStamp(), pipelineAndValidationStatus.isRemote(), pipelineAndValidationStatus.getPipelineStatus(), pipelineAndValidationStatus.getMessage(), pipelineAndValidationStatus.getWorkerInfos(), pipelineAndValidationStatus.getValidationStatus(), messagingJsonToFromDto.serialize(BeanHelper.wrapIssues(pipelineAndValidationStatus.getIssues())), pipelineAndValidationStatus.isClusterMode(), pipelineAndValidationStatus.getOffset(), 2, pipelineAndValidationStatus.getAcl(), pipelineAndValidationStatus.getRunnerCount());
        }

        @VisibleForTesting
        void callRemoteControl() {
            ArrayList arrayList = new ArrayList();
            Iterator<ClientEvent> it = this.ackEventList.iterator();
            while (it.hasNext()) {
                arrayList.add(it.next());
            }
            arrayList.addAll(getQueuedAckEvents());
            if (this.sdcInfoEvent != null) {
                arrayList.add(this.sdcInfoEvent);
            }
            try {
                if (!this.stopWatch.isRunning() || this.stopWatch.elapsed(TimeUnit.MILLISECONDS) > this.waitBetweenSendingStatusEvents) {
                    ArrayList arrayList2 = new ArrayList();
                    RemoteEventHandlerTask.LOG.debug("Sending status of all pipelines");
                    Iterator<PipelineAndValidationStatus> it2 = this.remoteDataCollector.getPipelines().iterator();
                    while (it2.hasNext()) {
                        arrayList2.add(createPipelineStatusEvent(this.jsonToFromDto, it2.next()));
                    }
                    this.stopWatch.reset();
                    PipelineStatusEvents pipelineStatusEvents = new PipelineStatusEvents();
                    pipelineStatusEvents.setPipelineStatusEventList(arrayList2);
                    arrayList.add(new ClientEvent(UUID.randomUUID().toString(), this.jobEventDestinationList, false, false, EventType.STATUS_MULTIPLE_PIPELINES, pipelineStatusEvents, null));
                    this.remoteEventList.clear();
                    arrayList.add(new ClientEvent(UUID.randomUUID().toString(), this.processAppDestinationList, false, false, EventType.SDC_PROCESS_METRICS_EVENT, getSdcMetricsEvent(), null));
                } else {
                    for (PipelineAndValidationStatus pipelineAndValidationStatus : this.remoteDataCollector.getRemotePipelinesWithChanges()) {
                        PipelineStatusEvent createPipelineStatusEvent = createPipelineStatusEvent(this.jsonToFromDto, pipelineAndValidationStatus);
                        this.remoteEventList.add(new ClientEvent(UUID.randomUUID().toString(), this.jobEventDestinationList, false, false, EventType.STATUS_PIPELINE, createPipelineStatusEvent(this.jsonToFromDto, pipelineAndValidationStatus), null));
                        RemoteEventHandlerTask.LOG.info(Utils.format("Sending event for remote pipeline: '{}' in status: '{}'", new Object[]{createPipelineStatusEvent.getName(), createPipelineStatusEvent.getPipelineStatus()}));
                    }
                }
            } catch (Exception e) {
                RemoteEventHandlerTask.LOG.warn(Utils.format("Error while creating/serializing pipeline status event: '{}'", new Object[]{e}), e);
            }
            arrayList.addAll(this.remoteEventList);
            try {
                List<ServerEventJson> submit = this.eventClient.submit("", new HashMap(), this.requestHeader, false, this.jsonToFromDto.toJson(arrayList));
                this.remoteEventList.clear();
                if (!this.eventToAckEventFuture.isEmpty()) {
                    Set set = (Set) arrayList.stream().map((v0) -> {
                        return v0.getEventId();
                    }).collect(Collectors.toSet());
                    Set set2 = (Set) this.eventToAckEventFuture.keySet().stream().filter(serverEvent -> {
                        return set.contains(serverEvent.getEventId());
                    }).collect(Collectors.toSet());
                    RemoteEventHandlerTask.LOG.info("Removing already acked events {}", set2);
                    this.eventToAckEventFuture.keySet().removeAll(set2);
                }
                if (!this.stopWatch.isRunning()) {
                    this.stopWatch.start();
                }
                ArrayList arrayList3 = new ArrayList();
                Iterator<ServerEventJson> it3 = submit.iterator();
                while (it3.hasNext()) {
                    ClientEvent handlePipelineEvent = handlePipelineEvent(it3.next());
                    if (handlePipelineEvent != null) {
                        arrayList3.add(handlePipelineEvent);
                    }
                }
                this.ackEventList = arrayList3;
                this.sdcInfoEvent = null;
            } catch (EventException | IOException e2) {
                RemoteEventHandlerTask.LOG.warn("Error while sending/receiving events to server:  " + e2, e2);
            }
        }

        private SDCProcessMetricsEvent getSdcMetricsEvent() {
            OperatingSystemMXBean platformMXBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
            Runtime runtime = Runtime.getRuntime();
            SDCProcessMetricsEvent sDCProcessMetricsEvent = new SDCProcessMetricsEvent();
            sDCProcessMetricsEvent.setTimestamp(System.currentTimeMillis());
            sDCProcessMetricsEvent.setSdcId(this.runtimeInfo.getId());
            sDCProcessMetricsEvent.setCpuLoad(platformMXBean.getProcessCpuLoad() * 100.0d);
            sDCProcessMetricsEvent.setUsedMemory(runtime.totalMemory() - runtime.freeMemory());
            return sDCProcessMetricsEvent;
        }

        private String handleServerEvent(ServerEvent serverEvent) {
            String str = null;
            try {
                Event event = serverEvent.getEvent();
                EventType eventType = serverEvent.getEventType();
                RemoteEventHandlerTask.LOG.info(Utils.format("Handling event: '{}' ", new Object[]{serverEvent}));
                switch (eventType) {
                    case PING_FREQUENCY_ADJUSTMENT:
                        this.delay = ((PingFrequencyAdjustmentEvent) event).getPingFrequency();
                        break;
                    case SAVE_PIPELINE:
                        PipelineSaveEvent pipelineSaveEvent = (PipelineSaveEvent) event;
                        PipelineConfigAndRules pipelineConfigurationAndRules = pipelineSaveEvent.getPipelineConfigurationAndRules();
                        this.remoteDataCollector.savePipeline(pipelineSaveEvent.getUser(), pipelineSaveEvent.getName(), pipelineSaveEvent.getRev(), pipelineSaveEvent.getDescription(), getSourceOffset(pipelineSaveEvent), BeanHelper.unwrapPipelineConfiguration((PipelineConfigurationJson) this.jsonToFromDto.deserialize(pipelineConfigurationAndRules.getPipelineConfig(), new TypeReference<PipelineConfigurationJson>() { // from class: _ss_com.streamsets.datacollector.event.handler.remote.RemoteEventHandlerTask.EventHandlerCallable.1
                        })), BeanHelper.unwrapRuleDefinitions((RuleDefinitionsJson) this.jsonToFromDto.deserialize(pipelineConfigurationAndRules.getPipelineRules(), new TypeReference<RuleDefinitionsJson>() { // from class: _ss_com.streamsets.datacollector.event.handler.remote.RemoteEventHandlerTask.EventHandlerCallable.2
                        })), pipelineSaveEvent.getAcl());
                        break;
                    case SAVE_RULES_PIPELINE:
                        PipelineSaveRulesEvent pipelineSaveRulesEvent = (PipelineSaveRulesEvent) event;
                        this.remoteDataCollector.savePipelineRules(pipelineSaveRulesEvent.getName(), pipelineSaveRulesEvent.getRev(), BeanHelper.unwrapRuleDefinitions((RuleDefinitionsJson) this.jsonToFromDto.deserialize(pipelineSaveRulesEvent.getRuleDefinitions(), new TypeReference<RuleDefinitionsJson>() { // from class: _ss_com.streamsets.datacollector.event.handler.remote.RemoteEventHandlerTask.EventHandlerCallable.3
                        })));
                        break;
                    case START_PIPELINE:
                        PipelineStartEvent pipelineStartEvent = (PipelineStartEvent) event;
                        this.remoteDataCollector.start(new StartPipelineContextBuilder(pipelineStartEvent.getUser()).withInterceptorConfigurations(pipelineStartEvent.getInterceptorConfiguration()).build(), pipelineStartEvent.getName(), pipelineStartEvent.getRev());
                        break;
                    case STOP_PIPELINE:
                        PipelineBaseEvent pipelineBaseEvent = (PipelineBaseEvent) event;
                        this.remoteDataCollector.stop(pipelineBaseEvent.getUser(), pipelineBaseEvent.getName(), pipelineBaseEvent.getRev());
                        break;
                    case VALIDATE_PIPELINE:
                        PipelineBaseEvent pipelineBaseEvent2 = (PipelineBaseEvent) event;
                        this.remoteDataCollector.validateConfigs(pipelineBaseEvent2.getUser(), pipelineBaseEvent2.getName(), pipelineBaseEvent2.getRev());
                        break;
                    case RESET_OFFSET_PIPELINE:
                        PipelineBaseEvent pipelineBaseEvent3 = (PipelineBaseEvent) event;
                        this.remoteDataCollector.resetOffset(pipelineBaseEvent3.getUser(), pipelineBaseEvent3.getName(), pipelineBaseEvent3.getRev());
                        break;
                    case DELETE_HISTORY_PIPELINE:
                        PipelineBaseEvent pipelineBaseEvent4 = (PipelineBaseEvent) event;
                        this.remoteDataCollector.deleteHistory(pipelineBaseEvent4.getUser(), pipelineBaseEvent4.getName(), pipelineBaseEvent4.getRev());
                        break;
                    case DELETE_PIPELINE:
                        PipelineBaseEvent pipelineBaseEvent5 = (PipelineBaseEvent) event;
                        this.remoteDataCollector.delete(pipelineBaseEvent5.getName(), pipelineBaseEvent5.getRev());
                        break;
                    case STOP_DELETE_PIPELINE:
                        PipelineStopAndDeleteEvent pipelineStopAndDeleteEvent = (PipelineStopAndDeleteEvent) event;
                        this.eventToAckEventFuture.put(serverEvent, this.remoteDataCollector.stopAndDelete(pipelineStopAndDeleteEvent.getUser(), pipelineStopAndDeleteEvent.getName(), pipelineStopAndDeleteEvent.getRev(), pipelineStopAndDeleteEvent.getForceTimeoutMillis()));
                        break;
                    case BLOB_STORE:
                        BlobStoreEvent blobStoreEvent = (BlobStoreEvent) event;
                        this.remoteDataCollector.blobStore(blobStoreEvent.getNamespace(), blobStoreEvent.getId(), blobStoreEvent.getVersion(), blobStoreEvent.getContent());
                        break;
                    case BLOB_DELETE:
                        BlobDeleteEvent blobDeleteEvent = (BlobDeleteEvent) event;
                        this.remoteDataCollector.blobDelete(blobDeleteEvent.getNamespace(), blobDeleteEvent.getId());
                        break;
                    case BLOB_DELETE_VERSION:
                        BlobDeleteVersionEvent blobDeleteVersionEvent = (BlobDeleteVersionEvent) event;
                        this.remoteDataCollector.blobDelete(blobDeleteVersionEvent.getNamespace(), blobDeleteVersionEvent.getId(), blobDeleteVersionEvent.getVersion());
                        break;
                    case SAVE_CONFIGURATION:
                        this.remoteDataCollector.storeConfiguration(((SaveConfigurationEvent) event).getConfiguration());
                        break;
                    case SYNC_ACL:
                        this.remoteDataCollector.syncAcl(((SyncAclEvent) event).getAcl());
                        break;
                    case SSO_DISCONNECTED_MODE_CREDENTIALS:
                        DisconnectedSecurityUtils.writeDisconnectedCredentials(this.disconnectedCredentialsDataStore, (DisconnectedSsoCredentialsEvent) event);
                        break;
                    default:
                        str = Utils.format("Unrecognized event: '{}'", new Object[]{eventType});
                        RemoteEventHandlerTask.LOG.warn(str);
                        break;
                }
            } catch (Exception e) {
                str = Utils.format("Remote event type: '{}' encountered exception '{}'", new Object[]{serverEvent.getEventType(), e.getMessage()});
                RemoteEventHandlerTask.LOG.warn(str, e);
            }
            return str;
        }

        @Nullable
        private SourceOffset getSourceOffset(PipelineSaveEvent pipelineSaveEvent) throws IOException {
            SourceOffset sourceOffset;
            String offset = pipelineSaveEvent.getOffset();
            if (pipelineSaveEvent.getOffsetProtocolVersion() < 2) {
                sourceOffset = new SourceOffset();
                sourceOffset.setOffset(offset);
            } else {
                sourceOffset = null == offset ? new SourceOffset(2, Collections.emptyMap()) : BeanHelper.unwrapSourceOffset((SourceOffsetJson) ObjectMapperFactory.get().readValue(offset, SourceOffsetJson.class));
            }
            new SourceOffsetUpgrader();
            SourceOffsetUpgrader.upgrade(sourceOffset);
            return sourceOffset;
        }

        private List<ClientEvent> getQueuedAckEvents() {
            ArrayList arrayList = new ArrayList();
            this.eventToAckEventFuture.entrySet().forEach(entry -> {
                AckEvent ackEvent;
                Future future = (Future) entry.getValue();
                if (future.isDone()) {
                    ServerEvent serverEvent = (ServerEvent) entry.getKey();
                    try {
                        ackEvent = (AckEvent) future.get();
                    } catch (Exception e) {
                        String format = Utils.format("Error while trying to get an ack event for eventType {}, eventId: {}, error is {} ", new Object[]{serverEvent.getEventType(), serverEvent.getEventId(), e});
                        RemoteEventHandlerTask.LOG.warn(format, e);
                        ackEvent = new AckEvent(AckEventStatus.ERROR, format);
                    }
                    arrayList.add(new ClientEvent(serverEvent.getEventId(), this.jobEventDestinationList, false, true, EventType.ACK_EVENT, ackEvent, null));
                }
            });
            return arrayList;
        }

        @VisibleForTesting
        ClientEvent handlePipelineEvent(ServerEventJson serverEventJson) {
            AckEventStatus ackEventStatus;
            String format;
            if (((Set) this.eventToAckEventFuture.keySet().stream().map((v0) -> {
                return v0.getEventId();
            }).collect(Collectors.toSet())).contains(serverEventJson.getEventId())) {
                RemoteEventHandlerTask.LOG.debug("Not processing event {} of type {} as its already being processed", serverEventJson.getEventId(), Integer.valueOf(serverEventJson.getEventTypeId()));
                return null;
            }
            try {
                ServerEvent asDto = this.jsonToFromDto.asDto(serverEventJson);
                if (asDto != null) {
                    format = handleServerEvent(asDto);
                    if (asDto.getEventType() == EventType.STOP_DELETE_PIPELINE) {
                        return null;
                    }
                    ackEventStatus = format == null ? AckEventStatus.SUCCESS : AckEventStatus.ERROR;
                } else {
                    ackEventStatus = AckEventStatus.IGNORE;
                    format = Utils.format("Cannot understand remote event code {}", new Object[]{Integer.valueOf(serverEventJson.getEventTypeId())});
                    RemoteEventHandlerTask.LOG.warn(format);
                }
            } catch (IOException e) {
                ackEventStatus = AckEventStatus.ERROR;
                format = 0 == 0 ? Utils.format("Can't parse event JSON", new Object[]{serverEventJson}) : Utils.format("Remote event type: '{}' encountered exception while being deserialized '{}'", new Object[]{null, e.getMessage()});
                RemoteEventHandlerTask.LOG.warn(format, e);
            }
            if (serverEventJson.isRequiresAck()) {
                return new ClientEvent(serverEventJson.getEventId(), this.jobEventDestinationList, false, true, EventType.ACK_EVENT, new AckEvent(ackEventStatus, format), null);
            }
            return null;
        }
    }

    public RemoteEventHandlerTask(RemoteDataCollector remoteDataCollector, EventClient eventClient, SafeScheduledExecutorService safeScheduledExecutorService, StageLibraryTask stageLibraryTask, RuntimeInfo runtimeInfo, Configuration configuration) {
        super("REMOTE_EVENT_HANDLER");
        this.remoteDataCollector = remoteDataCollector;
        this.jsonToFromDto = MessagingJsonToFromDto.INSTANCE;
        this.executorService = safeScheduledExecutorService;
        this.eventSenderReceiver = eventClient;
        this.stageLibrary = stageLibraryTask;
        this.runtimeInfo = runtimeInfo;
        this.appDestinationList = Arrays.asList(configuration.get(REMOTE_CONTROL_EVENTS_RECIPIENT, DEFAULT_REMOTE_CONTROL_EVENTS_RECIPIENT));
        this.processAppDestinationList = Lists.newArrayList(Splitter.on(",").omitEmptyStrings().split(configuration.get(REMOTE_CONTROL_PROCESS_EVENTS_RECIPIENTS, DEFAULT_REMOTE_CONTROL_PROCESS_EVENTS_RECIPIENTS)));
        this.labelList = Lists.newArrayList(Splitter.on(",").omitEmptyStrings().split(configuration.get(REMOTE_JOB_LABELS, "all")));
        this.defaultPingFrequency = Math.max(configuration.get(REMOTE_URL_PING_INTERVAL, PoolConfig.DEFAULT_VALIDATE_TIMEOUT), PoolConfig.DEFAULT_VALIDATE_TIMEOUT);
        this.sendAllStatusEventsInterval = Math.max(configuration.get(REMOTE_URL_SEND_ALL_STATUS_EVENTS_INTERVAL, 60000L), SYSTEM_LIMIT_MIN_STATUS_EVENTS_INTERVAL);
        this.requestHeader = new HashMap();
        this.requestHeader.put("X-Requested-By", "SDC");
        this.requestHeader.put(SSOConstants.X_APP_AUTH_TOKEN, runtimeInfo.getAppAuthToken());
        this.requestHeader.put(SSOConstants.X_APP_COMPONENT_ID, this.runtimeInfo.getId());
        this.stopWatch = Stopwatch.createUnstarted();
        this.dataStore = new DataStore(new File(runtimeInfo.getDataDir(), DisconnectedSSOManager.DISCONNECTED_SSO_AUTHENTICATION_FILE));
        try {
            this.dataStore.exists();
        } catch (IOException e) {
            LOG.warn("Could not recover disconnected credentials file '{}': {}", new Object[]{this.dataStore.getFile(), e.toString(), e});
            try {
                this.dataStore.delete();
            } catch (IOException e2) {
                throw new RuntimeException(Utils.format("Could not clear invalid disconected credentials file '{}': {}", new Object[]{this.dataStore.getFile(), e.toString()}), e);
            }
        }
        remoteDataCollector.init();
    }

    @VisibleForTesting
    DataStore getDisconnectedSsoCredentialsDataStore() {
        return this.dataStore;
    }

    @Override // _ss_com.streamsets.datacollector.task.AbstractTask
    public void runTask() {
        this.executorService.submit(new EventHandlerCallable(this.remoteDataCollector, this.eventSenderReceiver, this.jsonToFromDto, new ArrayList(), new ArrayList(), getStartupReportEvent(), this.executorService, this.defaultPingFrequency, this.appDestinationList, this.processAppDestinationList, this.requestHeader, this.stopWatch, this.sendAllStatusEventsInterval, this.dataStore, new LinkedHashMap(), this.runtimeInfo));
    }

    private ClientEvent getStartupReportEvent() {
        ArrayList arrayList = new ArrayList();
        for (StageDefinition stageDefinition : this.stageLibrary.getStages()) {
            arrayList.add(new StageInfo(stageDefinition.getName(), stageDefinition.getVersion(), stageDefinition.getLibrary()));
        }
        DataCollectorBuildInfo dataCollectorBuildInfo = new DataCollectorBuildInfo();
        return new ClientEvent(UUID.randomUUID().toString(), this.appDestinationList, false, false, EventType.SDC_INFO_EVENT, new SDCInfoEvent(this.runtimeInfo.getId(), this.runtimeInfo.getBaseHttpUrl(), System.getProperty("java.runtime.version"), arrayList, new SDCBuildInfo(dataCollectorBuildInfo.getVersion(), dataCollectorBuildInfo.getBuiltBy(), dataCollectorBuildInfo.getBuiltDate(), dataCollectorBuildInfo.getBuiltRepoSha(), dataCollectorBuildInfo.getSourceMd5Checksum()), this.labelList, 2, Strings.emptyToNull(this.runtimeInfo.getDeploymentId()), Runtime.getRuntime().totalMemory()), null);
    }

    @Override // _ss_com.streamsets.datacollector.task.AbstractTask
    public void stopTask() {
        this.executorService.shutdownNow();
    }
}
