package _ss_com.streamsets.datacollector.event.handler.remote;

import _ss_com.com.google.common.annotations.VisibleForTesting;
import _ss_com.fasterxml.jackson.core.JsonProcessingException;
import _ss_com.streamsets.datacollector.blobstore.BlobStoreTask;
import _ss_com.streamsets.datacollector.callback.CallbackInfo;
import _ss_com.streamsets.datacollector.callback.CallbackObjectType;
import _ss_com.streamsets.datacollector.config.PipelineConfiguration;
import _ss_com.streamsets.datacollector.config.RuleDefinitions;
import _ss_com.streamsets.datacollector.config.dto.ValidationStatus;
import _ss_com.streamsets.datacollector.event.dto.AckEvent;
import _ss_com.streamsets.datacollector.event.dto.AckEventStatus;
import _ss_com.streamsets.datacollector.event.dto.EventType;
import _ss_com.streamsets.datacollector.event.dto.WorkerInfo;
import _ss_com.streamsets.datacollector.event.handler.DataCollector;
import _ss_com.streamsets.datacollector.execution.Manager;
import _ss_com.streamsets.datacollector.execution.PipelineState;
import _ss_com.streamsets.datacollector.execution.PipelineStateStore;
import _ss_com.streamsets.datacollector.execution.PipelineStatus;
import _ss_com.streamsets.datacollector.execution.PreviewOutput;
import _ss_com.streamsets.datacollector.execution.PreviewStatus;
import _ss_com.streamsets.datacollector.execution.Previewer;
import _ss_com.streamsets.datacollector.execution.Runner;
import _ss_com.streamsets.datacollector.json.ObjectMapperFactory;
import _ss_com.streamsets.datacollector.main.RuntimeInfo;
import _ss_com.streamsets.datacollector.restapi.bean.SourceOffsetJson;
import _ss_com.streamsets.datacollector.runner.production.OffsetFileUtil;
import _ss_com.streamsets.datacollector.runner.production.SourceOffset;
import _ss_com.streamsets.datacollector.security.GroupsInScope;
import _ss_com.streamsets.datacollector.stagelibrary.StageLibraryTask;
import _ss_com.streamsets.datacollector.store.AclStoreTask;
import _ss_com.streamsets.datacollector.store.PipelineStoreException;
import _ss_com.streamsets.datacollector.store.PipelineStoreTask;
import _ss_com.streamsets.datacollector.util.Configuration;
import _ss_com.streamsets.datacollector.util.ContainerError;
import _ss_com.streamsets.datacollector.util.LogUtil;
import _ss_com.streamsets.datacollector.util.PipelineException;
import _ss_com.streamsets.datacollector.validation.Issues;
import _ss_com.streamsets.datacollector.validation.PipelineConfigurationValidator;
import _ss_com.streamsets.lib.security.acl.dto.Acl;
import _ss_com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService;
import _ss_com.streamsets.pipeline.lib.log.LogConstants;
import _ss_com.streamsets.pipeline.lib.util.ExceptionUtils;
import _ss_org.apache.commons.lang3.tuple.Pair;
import com.streamsets.pipeline.api.ExecutionMode;
import com.streamsets.pipeline.api.StageException;
import com.streamsets.pipeline.api.impl.Utils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import javax.inject.Inject;
import javax.inject.Named;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:_ss_com/streamsets/datacollector/event/handler/remote/RemoteDataCollector.class */
public class RemoteDataCollector implements DataCollector {
    public static final String IS_REMOTE_PIPELINE = "IS_REMOTE_PIPELINE";
    private static final String NAME_AND_REV_SEPARATOR = "::";
    private static final Logger LOG = LoggerFactory.getLogger(RemoteDataCollector.class);
    private final Configuration configuration;
    private final Manager manager;
    private final PipelineStoreTask pipelineStore;
    private final List<String> validatorIdList = new ArrayList();
    private final PipelineStateStore pipelineStateStore;
    private final RemoteStateEventListener stateEventListener;
    private final AclStoreTask aclStoreTask;
    private final AclCacheHelper aclCacheHelper;
    private final RuntimeInfo runtimeInfo;
    private final StageLibraryTask stageLibrary;
    private final BlobStoreTask blobStoreTask;
    private final SafeScheduledExecutorService eventHandlerExecutor;

    /* loaded from: input_file:_ss_com/streamsets/datacollector/event/handler/remote/RemoteDataCollector$StopAndDeleteCallable.class */
    static class StopAndDeleteCallable implements Callable<AckEvent> {
        private final RemoteDataCollector remoteDataCollector;
        private final String pipelineName;
        private final String rev;
        private final String user;
        private final long forceStopMillis;

        public StopAndDeleteCallable(RemoteDataCollector remoteDataCollector, String str, String str2, String str3, long j) {
            this.remoteDataCollector = remoteDataCollector;
            this.pipelineName = str2;
            this.rev = str3;
            this.user = str;
            this.forceStopMillis = j;
        }

        private boolean waitForInactiveState(PipelineStateStore pipelineStateStore, long j) throws PipelineStoreException {
            long currentTimeMillis = System.currentTimeMillis();
            PipelineState state = pipelineStateStore.getState(this.pipelineName, this.rev);
            while (state.getStatus().isActive() && System.currentTimeMillis() - currentTimeMillis < j) {
                try {
                    Thread.sleep(1000L);
                    state = pipelineStateStore.getState(this.pipelineName, this.rev);
                } catch (InterruptedException e) {
                    throw new IllegalStateException("Interrupted while waiting for pipeline to stop " + e, e);
                }
            }
            return pipelineStateStore.getState(this.pipelineName, this.rev).getStatus().isActive();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public AckEvent call() {
            long currentTimeMillis = System.currentTimeMillis();
            AckEventStatus ackEventStatus = AckEventStatus.SUCCESS;
            String str = null;
            try {
            } catch (Exception e) {
                ackEventStatus = AckEventStatus.ERROR;
                str = Utils.format("Remote event type {} encountered error {}", new Object[]{EventType.STOP_DELETE_PIPELINE, e});
            }
            if (!this.remoteDataCollector.pipelineStore.hasPipeline(this.pipelineName)) {
                RemoteDataCollector.LOG.warn("Pipeline {}:{} is already deleted", this.pipelineName, this.rev);
                return new AckEvent(ackEventStatus, null);
            }
            PipelineStateStore pipelineStateStore = this.remoteDataCollector.pipelineStateStore;
            Manager manager = this.remoteDataCollector.manager;
            PipelineState state = pipelineStateStore.getState(this.pipelineName, this.rev);
            if (state.getStatus().equals(PipelineStatus.STOPPING)) {
                throw new RuntimeException("Pipeline is already being stopped by another invocation of stopJob");
            }
            if (state.getStatus().isActive()) {
                try {
                    manager.getRunner(this.pipelineName, this.rev).stop(this.user);
                } catch (Exception e2) {
                    RemoteDataCollector.LOG.warn("Error while stopping the pipeline {}", e2, e2);
                }
            }
            if (waitForInactiveState(pipelineStateStore, this.forceStopMillis)) {
                try {
                    manager.getRunner(this.pipelineName, this.rev).forceQuit(this.user);
                } catch (Exception e3) {
                    RemoteDataCollector.LOG.warn("Cannot issue force quit on pipeline {}", this.pipelineName);
                }
            }
            if (waitForInactiveState(pipelineStateStore, 10000L)) {
                pipelineStateStore.saveState(this.user, this.pipelineName, this.rev, PipelineStatus.STOPPED, "Stopping pipeline forcefully as we are performing a delete afterwards", state.getAttributes(), state.getExecutionMode(), state.getMetrics(), state.getRetryAttempt(), state.getNextRetryTimeStamp());
            }
            this.remoteDataCollector.delete(this.pipelineName, this.rev);
            RemoteDataCollector.LOG.info("Time in secs to stop and delete pipeline {} is {}", this.pipelineName, Long.valueOf((System.currentTimeMillis() - currentTimeMillis) / 1000));
            return new AckEvent(ackEventStatus, str);
        }
    }

    @Inject
    public RemoteDataCollector(Configuration configuration, Manager manager, PipelineStoreTask pipelineStoreTask, PipelineStateStore pipelineStateStore, AclStoreTask aclStoreTask, RemoteStateEventListener remoteStateEventListener, RuntimeInfo runtimeInfo, AclCacheHelper aclCacheHelper, StageLibraryTask stageLibraryTask, BlobStoreTask blobStoreTask, @Named("eventHandlerExecutor") SafeScheduledExecutorService safeScheduledExecutorService) {
        this.configuration = configuration;
        this.manager = manager;
        this.pipelineStore = pipelineStoreTask;
        this.pipelineStateStore = pipelineStateStore;
        this.stateEventListener = remoteStateEventListener;
        this.runtimeInfo = runtimeInfo;
        this.aclStoreTask = aclStoreTask;
        this.aclCacheHelper = aclCacheHelper;
        this.stageLibrary = stageLibraryTask;
        this.blobStoreTask = blobStoreTask;
        this.eventHandlerExecutor = safeScheduledExecutorService;
    }

    public void init() {
        this.stateEventListener.init();
        this.manager.addStateEventListener(this.stateEventListener);
        this.pipelineStore.registerStateListener(this.stateEventListener);
    }

    @Override // _ss_com.streamsets.datacollector.event.handler.DataCollector
    public void start(Runner.StartPipelineContext startPipelineContext, String str, String str2) throws PipelineException, StageException {
        try {
            GroupsInScope.executeIgnoreGroups(() -> {
                PipelineState state = this.pipelineStateStore.getState(str, str2);
                if (state.getStatus().isActive()) {
                    LOG.warn("Pipeline {}:{} is already in active state {}", new Object[]{state.getPipelineId(), state.getRev(), state.getStatus()});
                    return null;
                }
                MDC.put(LogConstants.USER, startPipelineContext.getUser());
                LogUtil.injectPipelineInMDC(this.pipelineStore.getInfo(str).getTitle(), str);
                this.manager.getRunner(str, str2).start(startPipelineContext);
                return null;
            });
        } catch (Exception e) {
            LOG.warn(Utils.format("Error while starting pipeline: {} is {}", new Object[]{str, e}), e);
            if (e.getCause() != null) {
                ExceptionUtils.throwUndeclared(e.getCause());
            } else {
                ExceptionUtils.throwUndeclared(e);
            }
        } finally {
            MDC.clear();
        }
    }

    @Override // _ss_com.streamsets.datacollector.event.handler.DataCollector
    public void stop(String str, String str2, String str3) throws PipelineException {
        this.manager.getRunner(str2, str3).stop(str);
    }

    @Override // _ss_com.streamsets.datacollector.event.handler.DataCollector
    public void delete(String str, String str2) throws PipelineException {
        this.pipelineStore.delete(str);
        this.pipelineStore.deleteRules(str);
    }

    @Override // _ss_com.streamsets.datacollector.event.handler.DataCollector
    public void deleteHistory(String str, String str2, String str3) throws PipelineException {
        this.manager.getRunner(str2, str3).deleteHistory();
    }

    @VisibleForTesting
    boolean pipelineStateExists(String str, String str2) throws PipelineException {
        try {
            this.pipelineStateStore.getState(str, str2);
            return true;
        } catch (PipelineStoreException e) {
            if (e.getErrorCode().getCode().equals(ContainerError.CONTAINER_0209.name())) {
                return false;
            }
            throw e;
        }
    }

    @Override // _ss_com.streamsets.datacollector.event.handler.DataCollector
    public void savePipeline(String str, String str2, String str3, String str4, SourceOffset sourceOffset, PipelineConfiguration pipelineConfiguration, RuleDefinitions ruleDefinitions, Acl acl) throws PipelineException {
        if (!this.pipelineStore.hasPipeline(str2) && pipelineStateExists(str2, str3)) {
            LOG.warn("Deleting state file for pipeline {} as pipeline is deleted", str2);
            this.pipelineStateStore.delete(str2, str3);
        }
        pipelineConfiguration.setUuid(this.pipelineStore.create(str, str2, str2, str4, true, false).getUuid());
        this.pipelineStore.save(str, str2, str3, str4, new PipelineConfigurationValidator(this.stageLibrary, str2, pipelineConfiguration).validate());
        this.pipelineStore.storeRules(str2, str3, ruleDefinitions, false);
        if (acl != null) {
            this.aclStoreTask.saveAcl(str2, acl);
        }
        LOG.info("Offset for remote pipeline '{}:{}' is {}", new Object[]{str2, str3, sourceOffset});
        if (sourceOffset != null) {
            OffsetFileUtil.saveSourceOffset(this.runtimeInfo, str2, str3, sourceOffset);
        }
    }

    @Override // _ss_com.streamsets.datacollector.event.handler.DataCollector
    public void savePipelineRules(String str, String str2, RuleDefinitions ruleDefinitions) throws PipelineException {
        this.pipelineStore.getInfo(str);
        ruleDefinitions.setUuid(this.pipelineStore.retrieveRules(str, str2).getUuid());
        this.pipelineStore.storeRules(str, str2, ruleDefinitions, false);
    }

    @Override // _ss_com.streamsets.datacollector.event.handler.DataCollector
    public void resetOffset(String str, String str2, String str3) throws PipelineException {
        this.manager.getRunner(str2, str3).resetOffset(str);
    }

    @Override // _ss_com.streamsets.datacollector.event.handler.DataCollector
    public void validateConfigs(String str, String str2, String str3) throws PipelineException {
        Previewer createPreviewer = this.manager.createPreviewer(str, str2, str3);
        createPreviewer.validateConfigs(1000L);
        this.validatorIdList.add(createPreviewer.getId());
    }

    @Override // _ss_com.streamsets.datacollector.event.handler.DataCollector
    public Future<AckEvent> stopAndDelete(String str, String str2, String str3, long j) throws PipelineException, StageException {
        LOG.info("Pipeline will be stopped and deleted, force timeout is {}", Long.valueOf(j));
        return this.eventHandlerExecutor.submit(new StopAndDeleteCallable(this, str, str2, str3, j));
    }

    @Override // _ss_com.streamsets.datacollector.event.handler.DataCollector
    public List<PipelineAndValidationStatus> getRemotePipelinesWithChanges() throws PipelineException {
        String str;
        ArrayList arrayList = new ArrayList();
        for (Pair<PipelineState, Map<String, String>> pair : this.stateEventListener.getPipelineStateEvents()) {
            PipelineState left = pair.getLeft();
            Map<String, String> right = pair.getRight();
            String pipelineId = left.getPipelineId();
            String rev = left.getRev();
            boolean z = left.getExecutionMode() != ExecutionMode.STANDALONE;
            List<WorkerInfo> arrayList2 = new ArrayList();
            int i = 0;
            if (this.pipelineStore.hasPipeline(pipelineId)) {
                str = this.pipelineStore.getInfo(pipelineId).getTitle();
                Runner runner = this.manager.getRunner(pipelineId, rev);
                if (z) {
                    arrayList2 = getWorkers(runner.getSlaveCallbackList(CallbackObjectType.METRICS));
                }
                i = runner.getRunnerCount();
            } else {
                str = null;
            }
            arrayList.add(new PipelineAndValidationStatus(pipelineId, str, rev, left.getTimeStamp(), true, left.getStatus(), left.getMessage(), arrayList2, z, getSourceOffset(pipelineId, right), null, i));
        }
        return arrayList;
    }

    @Override // _ss_com.streamsets.datacollector.event.handler.DataCollector
    public void syncAcl(Acl acl) throws PipelineException {
        if (acl == null) {
            return;
        }
        if (this.pipelineStore.hasPipeline(acl.getResourceId())) {
            this.aclStoreTask.saveAcl(acl.getResourceId(), acl);
        } else {
            LOG.warn(ContainerError.CONTAINER_0200.getMessage(), acl.getResourceId());
        }
    }

    @Override // _ss_com.streamsets.datacollector.event.handler.DataCollector
    public void blobStore(String str, String str2, long j, String str3) throws StageException {
        this.blobStoreTask.store(str, str2, j, str3);
    }

    @Override // _ss_com.streamsets.datacollector.event.handler.DataCollector
    public void blobDelete(String str, String str2) throws StageException {
        LOG.debug("Deleting all blob objects for namespace={} and id={}", str, str2);
        this.blobStoreTask.deleteAllVersions(str, str2);
    }

    @Override // _ss_com.streamsets.datacollector.event.handler.DataCollector
    public void blobDelete(String str, String str2, long j) throws StageException {
        this.blobStoreTask.delete(str, str2, j);
    }

    @Override // _ss_com.streamsets.datacollector.event.handler.DataCollector
    public void storeConfiguration(Map<String, String> map) throws IOException {
        RuntimeInfo.storeControlHubConfigs(this.runtimeInfo, map);
        this.configuration.set(map);
    }

    private List<WorkerInfo> getWorkers(Collection<CallbackInfo> collection) {
        ArrayList arrayList = new ArrayList();
        for (CallbackInfo callbackInfo : collection) {
            WorkerInfo workerInfo = new WorkerInfo();
            workerInfo.setWorkerURL(callbackInfo.getSdcURL());
            workerInfo.setWorkerId(callbackInfo.getSlaveSdcId());
            arrayList.add(workerInfo);
        }
        return arrayList;
    }

    private String getOffset(String str, String str2) {
        return OffsetFileUtil.getSourceOffset(this.runtimeInfo, str, str2);
    }

    @Override // _ss_com.streamsets.datacollector.event.handler.DataCollector
    public Collection<PipelineAndValidationStatus> getPipelines() throws IOException, PipelineException {
        List<PipelineState> pipelines = this.manager.getPipelines();
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        for (PipelineState pipelineState : pipelines) {
            String pipelineId = pipelineState.getPipelineId();
            String title = this.pipelineStore.getInfo(pipelineId).getTitle();
            String rev = pipelineState.getRev();
            boolean z = this.manager.isRemotePipeline(pipelineId, rev);
            if (z || this.manager.isPipelineActive(pipelineId, rev)) {
                ArrayList arrayList = new ArrayList();
                boolean z2 = pipelineState.getExecutionMode() != ExecutionMode.STANDALONE;
                Runner runner = this.manager.getRunner(pipelineId, rev);
                if (z2) {
                    for (CallbackInfo callbackInfo : runner.getSlaveCallbackList(CallbackObjectType.METRICS)) {
                        WorkerInfo workerInfo = new WorkerInfo();
                        workerInfo.setWorkerURL(callbackInfo.getSdcURL());
                        workerInfo.setWorkerId(callbackInfo.getSlaveSdcId());
                        arrayList.add(workerInfo);
                    }
                }
                Acl acl = null;
                if (!z) {
                    hashSet.add(pipelineId);
                    acl = this.aclCacheHelper.getAcl(pipelineId);
                }
                hashMap.put(getNameAndRevString(pipelineId, rev), new PipelineAndValidationStatus(pipelineId, title, rev, pipelineState.getTimeStamp(), z, pipelineState.getStatus(), pipelineState.getMessage(), arrayList, z2, z ? getOffset(pipelineId, rev) : null, acl, runner.getRunnerCount()));
            }
        }
        this.aclCacheHelper.removeIfAbsent(hashSet);
        setValidationStatus(hashMap);
        return hashMap.values();
    }

    private void setValidationStatus(Map<String, PipelineAndValidationStatus> map) {
        ArrayList arrayList = new ArrayList();
        for (String str : this.validatorIdList) {
            Previewer previewer = this.manager.getPreviewer(str);
            if (previewer != null) {
                ValidationStatus validationStatus = null;
                Issues issues = null;
                String str2 = null;
                if (previewer != null) {
                    PreviewStatus status = previewer.getStatus();
                    switch (status) {
                        case INVALID:
                            validationStatus = ValidationStatus.INVALID;
                            break;
                        case TIMING_OUT:
                        case TIMED_OUT:
                            validationStatus = ValidationStatus.TIMED_OUT;
                            break;
                        case VALID:
                            validationStatus = ValidationStatus.VALID;
                            break;
                        case VALIDATING:
                            validationStatus = ValidationStatus.VALIDATING;
                            break;
                        case VALIDATION_ERROR:
                            validationStatus = ValidationStatus.VALIDATION_ERROR;
                            break;
                        default:
                            LOG.warn(Utils.format("Unrecognized validation state: '{}'", new Object[]{status}));
                            break;
                    }
                    if (!status.isActive()) {
                        PreviewOutput output = previewer.getOutput();
                        issues = output.getIssues();
                        str2 = output.getMessage();
                        arrayList.add(str);
                    }
                } else {
                    LOG.warn(Utils.format("Previewer is null for id: '{}'", new Object[]{str}));
                }
                PipelineAndValidationStatus pipelineAndValidationStatus = map.get(getNameAndRevString(previewer.getName(), previewer.getRev()));
                if (pipelineAndValidationStatus == null) {
                    LOG.warn("Preview pipeline: '{}'::'{}' doesn't exist", previewer.getName(), previewer.getRev());
                } else {
                    pipelineAndValidationStatus.setValidationStatus(validationStatus);
                    pipelineAndValidationStatus.setIssues(issues);
                    pipelineAndValidationStatus.setMessage(str2);
                }
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            this.validatorIdList.remove((String) it.next());
        }
    }

    private String getNameAndRevString(String str, String str2) {
        return str + NAME_AND_REV_SEPARATOR + str2;
    }

    @VisibleForTesting
    List<String> getValidatorList() {
        return this.validatorIdList;
    }

    private String getSourceOffset(String str, Map<String, String> map) {
        try {
            return ObjectMapperFactory.get().writeValueAsString(new SourceOffsetJson(new SourceOffset(2, map)));
        } catch (JsonProcessingException e) {
            throw new IllegalStateException(Utils.format("Failed to fetch source offset for pipeline: {} due to error: {}", new Object[]{str, e.toString()}), e);
        }
    }
}
