/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.kafka010;

import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateDelegationTokenOptions;
import org.apache.kafka.clients.admin.CreateDelegationTokenResult;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.security.token.delegation.DelegationToken;
import org.apache.kafka.common.security.token.delegation.TokenInformation;
import org.apache.spark.SparkConf;
import org.apache.spark.deploy.SparkHadoopUtil$;
import org.apache.spark.internal.Logging;
import org.apache.spark.internal.config.ConfigEntry;
import org.apache.spark.internal.config.package$;
import org.apache.spark.kafka010.KafkaRedactionUtil$;
import org.apache.spark.kafka010.KafkaTokenClusterConf;
import org.apache.spark.kafka010.KafkaTokenSparkConf$;
import org.apache.spark.kafka010.KafkaTokenUtil;
import org.apache.spark.util.SecurityUtils$;
import org.apache.spark.util.Utils$;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.TraversableLike;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.MapLike;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.control.NonFatal$;

public final class KafkaTokenUtil$
implements Logging {
    public static KafkaTokenUtil$ MODULE$;
    private final Text TOKEN_KIND;
    private final String TOKEN_SERVICE_PREFIX;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    static {
        new KafkaTokenUtil$();
    }

    public String logName() {
        return Logging.logName$((Logging)this);
    }

    public Logger log() {
        return Logging.log$((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.logInfo$((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.logDebug$((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.logTrace$((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.logWarning$((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.logError$((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.logInfo$((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.logDebug$((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.logTrace$((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.logWarning$((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.logError$((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter);
    }

    public boolean initializeLogIfNecessary(boolean isInterpreter, boolean silent) {
        return Logging.initializeLogIfNecessary$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$((Logging)this);
    }

    public void initializeForcefully(boolean isInterpreter, boolean silent) {
        Logging.initializeForcefully$((Logging)this, (boolean)isInterpreter, (boolean)silent);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    public Text TOKEN_KIND() {
        return this.TOKEN_KIND;
    }

    private String TOKEN_SERVICE_PREFIX() {
        return this.TOKEN_SERVICE_PREFIX;
    }

    public Text getTokenService(String identifier) {
        return new Text(new StringBuilder(1).append(this.TOKEN_SERVICE_PREFIX()).append(".").append(identifier).toString());
    }

    private String getClusterIdentifier(Text service) {
        return service.toString().replace(new StringBuilder(1).append(this.TOKEN_SERVICE_PREFIX()).append(".").toString(), "");
    }

    public Tuple2<Token<KafkaTokenUtil.KafkaDelegationTokenIdentifier>, Object> obtainToken(SparkConf sparkConf, KafkaTokenClusterConf clusterConf) {
        this.checkProxyUser();
        AdminClient adminClient = AdminClient.create((Properties)this.createAdminClientProperties(sparkConf, clusterConf));
        CreateDelegationTokenOptions createDelegationTokenOptions = new CreateDelegationTokenOptions();
        CreateDelegationTokenResult createResult = adminClient.createDelegationToken(createDelegationTokenOptions);
        DelegationToken token = (DelegationToken)createResult.delegationToken().get();
        this.printToken(token);
        return new Tuple2((Object)new Token(token.tokenInfo().tokenId().getBytes(), token.hmacAsBase64String().getBytes(), this.TOKEN_KIND(), this.getTokenService(clusterConf.identifier())), (Object)BoxesRunTime.boxToLong((long)token.tokenInfo().expiryTimestamp()));
    }

    public void checkProxyUser() {
        UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
        Predef$.MODULE$.require(!SparkHadoopUtil$.MODULE$.get().isProxyUser(currentUser), (Function0 & Serializable & scala.Serializable)() -> "Obtaining delegation token for proxy user is not yet supported.");
    }

    public Properties createAdminClientProperties(SparkConf sparkConf, KafkaTokenClusterConf clusterConf) {
        Object object;
        Properties adminClientProperties = new Properties();
        adminClientProperties.put("bootstrap.servers", clusterConf.authBootstrapServers());
        adminClientProperties.put("security.protocol", clusterConf.securityProtocol());
        String string = clusterConf.securityProtocol();
        String string2 = SecurityProtocol.SASL_SSL.name;
        String string3 = string;
        if (!(string2 != null ? !string2.equals(string3) : string3 != null)) {
            this.setTrustStoreProperties(clusterConf, adminClientProperties);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            String string4 = SecurityProtocol.SSL.name;
            String string5 = string;
            if (!(string4 != null ? !string4.equals(string5) : string5 != null)) {
                this.setTrustStoreProperties(clusterConf, adminClientProperties);
                this.setKeyStoreProperties(clusterConf, adminClientProperties);
                this.logWarning((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Obtaining kafka delegation token with SSL protocol. Please configure 2-way authentication on the broker side.");
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                String string6 = SecurityProtocol.SASL_PLAINTEXT.name;
                String string7 = string;
                if (!(string6 != null ? !string6.equals(string7) : string7 != null)) {
                    this.logWarning((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Obtaining kafka delegation token through plain communication channel. Please consider the security impact.");
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                } else {
                    throw new MatchError((Object)string);
                }
            }
        }
        if (this.isGlobalJaasConfigurationProvided()) {
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "JVM global security configuration detected, using it for login.");
            object = BoxedUnit.UNIT;
        } else {
            adminClientProperties.put("sasl.mechanism", "GSSAPI");
            if (sparkConf.contains((ConfigEntry)package$.MODULE$.KEYTAB())) {
                this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Keytab detected, using it for login.");
                String keyTab = (String)((Option)sparkConf.get((ConfigEntry)package$.MODULE$.KEYTAB())).get();
                String principal = (String)((Option)sparkConf.get((ConfigEntry)package$.MODULE$.PRINCIPAL())).get();
                String jaasParams = this.getKeytabJaasParams(keyTab, principal, clusterConf.kerberosServiceName());
                object = adminClientProperties.put("sasl.jaas.config", jaasParams);
            } else {
                this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Using ticket cache for login.");
                String jaasParams = this.getTicketCacheJaasParams(clusterConf);
                object = adminClientProperties.put("sasl.jaas.config", jaasParams);
            }
        }
        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(44).append("AdminClient params before specified params: ").append(KafkaRedactionUtil$.MODULE$.redactParams((Seq<Tuple2<String, Object>>)((MapLike)JavaConverters$.MODULE$.propertiesAsScalaMapConverter(adminClientProperties).asScala()).toSeq())).toString());
        clusterConf.specifiedKafkaParams().foreach((Function1 & Serializable & scala.Serializable)param -> adminClientProperties.setProperty((String)param._1(), (String)param._2()));
        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(43).append("AdminClient params after specified params: ").append(KafkaRedactionUtil$.MODULE$.redactParams((Seq<Tuple2<String, Object>>)((MapLike)JavaConverters$.MODULE$.propertiesAsScalaMapConverter(adminClientProperties).asScala()).toSeq())).toString());
        return adminClientProperties;
    }

    public boolean isGlobalJaasConfigurationProvided() {
        boolean bl;
        try {
            JaasContext.loadClientContext(Collections.emptyMap());
            bl = true;
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            Option option = NonFatal$.MODULE$.unapply(throwable2);
            if (option.isEmpty()) {
                throw throwable;
            }
            boolean bl2 = false;
            bl = bl2;
        }
        return bl;
    }

    private void setTrustStoreProperties(KafkaTokenClusterConf clusterConf, Properties properties) {
        clusterConf.trustStoreLocation().foreach((Function1 & Serializable & scala.Serializable)truststoreLocation -> properties.put("ssl.truststore.location", truststoreLocation));
        clusterConf.trustStorePassword().foreach((Function1 & Serializable & scala.Serializable)truststorePassword -> properties.put("ssl.truststore.password", truststorePassword));
    }

    private void setKeyStoreProperties(KafkaTokenClusterConf clusterConf, Properties properties) {
        clusterConf.keyStoreLocation().foreach((Function1 & Serializable & scala.Serializable)keystoreLocation -> properties.put("ssl.keystore.location", keystoreLocation));
        clusterConf.keyStorePassword().foreach((Function1 & Serializable & scala.Serializable)keystorePassword -> properties.put("ssl.keystore.password", keystorePassword));
        clusterConf.keyPassword().foreach((Function1 & Serializable & scala.Serializable)keyPassword -> properties.put("ssl.key.password", keyPassword));
    }

    public String getKeytabJaasParams(String keyTab, String principal, String kerberosServiceName) {
        String params = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(125).append("\n      |").append(SecurityUtils$.MODULE$.getKrb5LoginModuleName()).append(" required\n      | debug=").append(SecurityUtils$.MODULE$.isGlobalKrbDebugEnabled()).append("\n      | useKeyTab=true\n      | serviceName=\"").append(kerberosServiceName).append("\"\n      | keyTab=\"").append(keyTab).append("\"\n      | principal=\"").append(principal).append("\";\n      ").toString())).stripMargin().replace("\n", "");
        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(24).append("Krb keytab JAAS params: ").append(params).toString());
        return params;
    }

    /*
     * WARNING - void declaration
     */
    private String getTicketCacheJaasParams(KafkaTokenClusterConf clusterConf) {
        void var2_2;
        String params = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(91).append("\n      |").append(SecurityUtils$.MODULE$.getKrb5LoginModuleName()).append(" required\n      | debug=").append(SecurityUtils$.MODULE$.isGlobalKrbDebugEnabled()).append("\n      | useTicketCache=true\n      | serviceName=\"").append(clusterConf.kerberosServiceName()).append("\";\n      ").toString())).stripMargin().replace("\n", "").trim();
        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(30).append("Krb ticket cache JAAS params: ").append(params).toString());
        return var2_2;
    }

    private void printToken(DelegationToken token) {
        block0: {
            if (!this.log().isDebugEnabled()) break block0;
            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("%-15s %-30s %-15s %-25s %-15s %-15s %-15s")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"TOKENID", "HMAC", "OWNER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE"})));
            TokenInformation tokenInfo = token.tokenInfo();
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringOps(Predef$.MODULE$.augmentString("%-15s %-15s %-15s %-25s %-15s %-15s %-15s")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{tokenInfo.tokenId(), Utils$.MODULE$.REDACTION_REPLACEMENT_TEXT(), tokenInfo.owner(), tokenInfo.renewersAsString(), dateFormat.format(BoxesRunTime.boxToLong((long)tokenInfo.issueTimestamp())), dateFormat.format(BoxesRunTime.boxToLong((long)tokenInfo.expiryTimestamp())), dateFormat.format(BoxesRunTime.boxToLong((long)tokenInfo.maxTimestamp()))})));
        }
    }

    public Option<KafkaTokenClusterConf> findMatchingTokenClusterConfig(SparkConf sparkConf, String bootStrapServers) {
        Iterable tokens = (Iterable)JavaConverters$.MODULE$.collectionAsScalaIterableConverter(UserGroupInformation.getCurrentUser().getCredentials().getAllTokens()).asScala();
        Iterable clusterConfigs = (Iterable)((TraversableLike)((TraversableLike)tokens.filter((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToBoolean((boolean)KafkaTokenUtil$.$anonfun$findMatchingTokenClusterConfig$1(x$1)))).map((Function1 & Serializable & scala.Serializable)token -> KafkaTokenSparkConf$.MODULE$.getClusterConfig(sparkConf, MODULE$.getClusterIdentifier(token.getService())), Iterable$.MODULE$.canBuildFrom())).filter((Function1 & Serializable & scala.Serializable)clusterConfig -> BoxesRunTime.boxToBoolean((boolean)KafkaTokenUtil$.$anonfun$findMatchingTokenClusterConfig$3(bootStrapServers, clusterConfig)));
        Predef$.MODULE$.require(clusterConfigs.size() <= 1, (Function0 & Serializable & scala.Serializable)() -> new StringBuilder(73).append("More than one delegation token matches the following ").append("bootstrap servers: ").append(bootStrapServers).append(".").toString());
        return clusterConfigs.headOption();
    }

    public String getTokenJaasParams(KafkaTokenClusterConf clusterConf) {
        Token token = UserGroupInformation.getCurrentUser().getCredentials().getToken(this.getTokenService(clusterConf.identifier()));
        Predef$.MODULE$.require(token != null, (Function0 & Serializable & scala.Serializable)() -> new StringBuilder(32).append("Token for identifier ").append(clusterConf.identifier()).append(" must exist").toString());
        String username = new String(token.getIdentifier());
        String password = new String(token.getPassword());
        String loginModuleName = ScramLoginModule.class.getName();
        String params = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(111).append("\n      |").append(loginModuleName).append(" required\n      | tokenauth=true\n      | serviceName=\"").append(clusterConf.kerberosServiceName()).append("\"\n      | username=\"").append(username).append("\"\n      | password=\"").append(password).append("\";\n      ").toString())).stripMargin().replace("\n", "").trim();
        this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(19).append("Scram JAAS params: ").append(KafkaRedactionUtil$.MODULE$.redactJaasParam(params)).toString());
        return params;
    }

    public boolean needTokenUpdate(Map<String, Object> params, Option<KafkaTokenClusterConf> clusterConfig) {
        boolean bl;
        if (clusterConfig.isDefined() && params.containsKey("sasl.jaas.config")) {
            this.logDebug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Delegation token used by connector, checking if uses the latest token.");
            String connectorJaasParams = (String)params.get("sasl.jaas.config");
            String string = this.getTokenJaasParams((KafkaTokenClusterConf)clusterConfig.get());
            String string2 = connectorJaasParams;
            bl = string == null ? string2 != null : !string.equals(string2);
        } else {
            bl = false;
        }
        return bl;
    }

    public static final /* synthetic */ boolean $anonfun$findMatchingTokenClusterConfig$1(Token x$1) {
        return x$1.getService().toString().startsWith(MODULE$.TOKEN_SERVICE_PREFIX());
    }

    public static final /* synthetic */ boolean $anonfun$findMatchingTokenClusterConfig$4(Pattern pattern$1, String x$2) {
        return pattern$1.matcher(x$2).matches();
    }

    public static final /* synthetic */ boolean $anonfun$findMatchingTokenClusterConfig$3(String bootStrapServers$1, KafkaTokenClusterConf clusterConfig) {
        Pattern pattern = Pattern.compile(clusterConfig.targetServersRegex());
        return Utils$.MODULE$.stringToSeq(bootStrapServers$1).exists((Function1 & Serializable & scala.Serializable)x$2 -> BoxesRunTime.boxToBoolean((boolean)KafkaTokenUtil$.$anonfun$findMatchingTokenClusterConfig$4(pattern, x$2)));
    }

    private KafkaTokenUtil$() {
        MODULE$ = this;
        Logging.$init$((Logging)this);
        this.TOKEN_KIND = new Text("KAFKA_DELEGATION_TOKEN");
        this.TOKEN_SERVICE_PREFIX = "kafka.server.delegation.token";
    }
}

