/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.errors.TopologyBuilderException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsKafkaClient;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamPartitionAssignor
implements PartitionAssignor,
Configurable {
    private static final Logger log = LoggerFactory.getLogger(StreamPartitionAssignor.class);
    private Time time = Time.SYSTEM;
    private static final int UNKNOWN = -1;
    public static final int NOT_AVAILABLE = -2;
    private static final Comparator<TopicPartition> PARTITION_COMPARATOR = new Comparator<TopicPartition>(){

        @Override
        public int compare(TopicPartition p1, TopicPartition p2) {
            int result = p1.topic().compareTo(p2.topic());
            if (result != 0) {
                return result;
            }
            return p1.partition() < p2.partition() ? -1 : (p1.partition() > p2.partition() ? 1 : 0);
        }
    };
    private StreamThread streamThread;
    private String userEndPoint;
    private int numStandbyReplicas;
    private Cluster metadataWithInternalTopics;
    private Map<HostInfo, Set<TopicPartition>> partitionsByHostState;
    private Map<TaskId, Set<TopicPartition>> standbyTasks;
    private Map<TaskId, Set<TopicPartition>> activeTasks;
    private InternalTopicManager internalTopicManager;
    private CopartitionedTopicsValidator copartitionedTopicsValidator;

    void time(Time time) {
        this.time = time;
    }

    public void configure(Map<String, ?> configs) {
        this.numStandbyReplicas = (Integer)configs.get("num.standby.replicas");
        Object o = configs.get("__stream.thread.instance__");
        if (o == null) {
            KafkaException ex = new KafkaException("StreamThread is not specified");
            log.error(ex.getMessage(), (Throwable)ex);
            throw ex;
        }
        if (!(o instanceof StreamThread)) {
            KafkaException ex = new KafkaException(String.format("%s is not an instance of %s", o.getClass().getName(), StreamThread.class.getName()));
            log.error(ex.getMessage(), (Throwable)ex);
            throw ex;
        }
        this.streamThread = (StreamThread)o;
        this.streamThread.setPartitionAssignor(this);
        String userEndPoint = (String)configs.get("application.server");
        if (userEndPoint != null && !userEndPoint.isEmpty()) {
            try {
                String host = Utils.getHost((String)userEndPoint);
                Integer port = Utils.getPort((String)userEndPoint);
                if (host == null || port == null) {
                    throw new ConfigException(String.format("stream-thread [%s] Config %s isn't in the correct format. Expected a host:port pair but received %s", this.streamThread.getName(), "application.server", userEndPoint));
                }
            }
            catch (NumberFormatException nfe) {
                throw new ConfigException(String.format("stream-thread [%s] Invalid port supplied in %s for config %s", this.streamThread.getName(), userEndPoint, "application.server"));
            }
            this.userEndPoint = userEndPoint;
        }
        this.internalTopicManager = new InternalTopicManager(new StreamsKafkaClient(this.streamThread.config), configs.containsKey("replication.factor") ? (Integer)configs.get("replication.factor") : 1, configs.containsKey("windowstore.changelog.additional.retention.ms") ? (Long)configs.get("windowstore.changelog.additional.retention.ms") : InternalTopicManager.WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT, this.time);
        this.copartitionedTopicsValidator = new CopartitionedTopicsValidator(this.streamThread.getName());
    }

    public String name() {
        return "stream";
    }

    public PartitionAssignor.Subscription subscription(Set<String> topics) {
        Set<TaskId> previousActiveTasks = this.streamThread.prevActiveTasks();
        Set<TaskId> standbyTasks = this.streamThread.cachedTasks();
        standbyTasks.removeAll(previousActiveTasks);
        SubscriptionInfo data = new SubscriptionInfo(this.streamThread.processId, previousActiveTasks, standbyTasks, this.userEndPoint);
        if (this.streamThread.builder.sourceTopicPattern() != null && !this.streamThread.builder.subscriptionUpdates().getUpdates().equals(topics)) {
            this.updateSubscribedTopics(topics);
        }
        return new PartitionAssignor.Subscription(new ArrayList<String>(topics), data.encode());
    }

    private void updateSubscribedTopics(Set<String> topics) {
        SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
        log.debug("stream-thread [{}] found {} topics possibly matching regex", (Object)this.streamThread.getName(), topics);
        subscriptionUpdates.updateTopics(topics);
        this.streamThread.builder.updateSubscriptions(subscriptionUpdates, this.streamThread.getName());
    }

    /*
     * WARNING - void declaration
     */
    public Map<String, PartitionAssignor.Assignment> assign(Cluster metadata, Map<String, PartitionAssignor.Subscription> subscriptions) {
        boolean numPartitionsNeeded;
        HashMap<UUID, ClientMetadata> clientsMetadata = new HashMap<UUID, ClientMetadata>();
        for (Map.Entry<String, PartitionAssignor.Subscription> entry : subscriptions.entrySet()) {
            void var9_10;
            String consumerId = entry.getKey();
            PartitionAssignor.Subscription subscription = entry.getValue();
            SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
            ClientMetadata clientMetadata = (ClientMetadata)clientsMetadata.get(info.processId);
            if (clientMetadata == null) {
                ClientMetadata clientMetadata2 = new ClientMetadata(info.userEndPoint);
                clientsMetadata.put(info.processId, clientMetadata2);
            }
            var9_10.addConsumer(consumerId, info);
        }
        log.debug("stream-thread [{}] Constructed client metadata {} from the member subscriptions.", (Object)this.streamThread.getName(), clientsMetadata);
        Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = this.streamThread.builder.topicGroups();
        HashMap<String, InternalTopicMetadata> repartitionTopicMetadata = new HashMap<String, InternalTopicMetadata>();
        for (TopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
            for (InternalTopicConfig internalTopicConfig : topicsInfo.repartitionSourceTopics.values()) {
                repartitionTopicMetadata.put(internalTopicConfig.name(), new InternalTopicMetadata(internalTopicConfig));
            }
        }
        do {
            numPartitionsNeeded = false;
            for (TopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) {
                for (String topicName : topicsInfo.repartitionSourceTopics.keySet()) {
                    int numPartitions = ((InternalTopicMetadata)repartitionTopicMetadata.get((Object)topicName)).numPartitions;
                    if (numPartitions != -1) continue;
                    for (TopologyBuilder.TopicsInfo otherTopicsInfo : topicGroups.values()) {
                        Set<String> otherSinkTopics = otherTopicsInfo.sinkTopics;
                        if (!otherSinkTopics.contains(topicName)) continue;
                        for (String string : otherTopicsInfo.sourceTopics) {
                            Integer numPartitionsCandidate;
                            if (repartitionTopicMetadata.containsKey(string)) {
                                numPartitionsCandidate = ((InternalTopicMetadata)repartitionTopicMetadata.get((Object)string)).numPartitions;
                            } else {
                                numPartitionsCandidate = metadata.partitionCountForTopic(string);
                                if (numPartitionsCandidate == null) {
                                    ((InternalTopicMetadata)repartitionTopicMetadata.get((Object)topicName)).numPartitions = -2;
                                }
                            }
                            if (numPartitionsCandidate == null || numPartitionsCandidate <= numPartitions) continue;
                            numPartitions = numPartitionsCandidate;
                        }
                    }
                    if (numPartitions == -1) {
                        numPartitionsNeeded = true;
                        continue;
                    }
                    ((InternalTopicMetadata)repartitionTopicMetadata.get((Object)topicName)).numPartitions = numPartitions;
                }
            }
        } while (numPartitionsNeeded);
        HashMap<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = new HashMap<TopicPartition, PartitionInfo>();
        for (Map.Entry entry : repartitionTopicMetadata.entrySet()) {
            String topic = (String)entry.getKey();
            Integer numPartitions = ((InternalTopicMetadata)entry.getValue()).numPartitions;
            for (int partition = 0; partition < numPartitions; ++partition) {
                allRepartitionTopicPartitions.put(new TopicPartition(topic, partition), new PartitionInfo(topic, partition, null, new Node[0], new Node[0]));
            }
        }
        this.ensureCopartitioning(this.streamThread.builder.copartitionGroups(), repartitionTopicMetadata, metadata);
        this.prepareTopic(repartitionTopicMetadata);
        this.metadataWithInternalTopics = metadata.withPartitions(allRepartitionTopicPartitions);
        log.debug("stream-thread [{}] Created repartition topics {} from the parsed topology.", (Object)this.streamThread.getName(), allRepartitionTopicPartitions.values());
        HashSet<String> allSourceTopics = new HashSet<String>();
        HashMap<Integer, Set<String>> hashMap = new HashMap<Integer, Set<String>>();
        for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
            allSourceTopics.addAll(entry.getValue().sourceTopics);
            hashMap.put(entry.getKey(), entry.getValue().sourceTopics);
        }
        Map<TaskId, Set<TopicPartition>> partitionsForTask = this.streamThread.partitionGrouper.partitionGroups(hashMap, this.metadataWithInternalTopics);
        HashSet<TopicPartition> allAssignedPartitions = new HashSet<TopicPartition>();
        HashMap<Integer, HashSet<TaskId>> tasksByTopicGroup = new HashMap<Integer, HashSet<TaskId>>();
        for (Map.Entry<TaskId, Set<TopicPartition>> entry : partitionsForTask.entrySet()) {
            Set<TopicPartition> partitions = entry.getValue();
            for (TopicPartition partition : partitions) {
                if (!allAssignedPartitions.contains(partition)) continue;
                log.warn("stream-thread [{}] Partition {} is assigned to more than one tasks: {}", new Object[]{this.streamThread.getName(), partition, partitionsForTask});
            }
            allAssignedPartitions.addAll(partitions);
            TaskId taskId = entry.getKey();
            HashSet<TaskId> ids = (HashSet<TaskId>)tasksByTopicGroup.get(taskId.topicGroupId);
            if (ids == null) {
                ids = new HashSet<TaskId>();
                tasksByTopicGroup.put(taskId.topicGroupId, ids);
            }
            ids.add(taskId);
        }
        for (String topic : allSourceTopics) {
            List partitionInfoList = this.metadataWithInternalTopics.partitionsForTopic(topic);
            if (!partitionInfoList.isEmpty()) {
                for (PartitionInfo partitionInfo : partitionInfoList) {
                    TopicPartition partition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                    if (allAssignedPartitions.contains(partition)) continue;
                    log.warn("stream-thread [{}] Partition {} is not assigned to any tasks: {}", new Object[]{this.streamThread.getName(), partition, partitionsForTask});
                }
                continue;
            }
            log.warn("stream-thread [{}] No partitions found for topic {}", (Object)this.streamThread.getName(), (Object)topic);
        }
        HashMap<String, InternalTopicMetadata> changelogTopicMetadata = new HashMap<String, InternalTopicMetadata>();
        for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
            int n = entry.getKey();
            Map<String, InternalTopicConfig> stateChangelogTopics = entry.getValue().stateChangelogTopics;
            for (InternalTopicConfig topicConfig : stateChangelogTopics.values()) {
                int numPartitions = -1;
                if (tasksByTopicGroup.get(n) != null) {
                    for (TaskId task : (Set)tasksByTopicGroup.get(n)) {
                        if (numPartitions >= task.partition + 1) continue;
                        numPartitions = task.partition + 1;
                    }
                    InternalTopicMetadata topicMetadata = new InternalTopicMetadata(topicConfig);
                    topicMetadata.numPartitions = numPartitions;
                    changelogTopicMetadata.put(topicConfig.name(), topicMetadata);
                    continue;
                }
                log.debug("stream-thread [{}] No tasks found for topic group {}", (Object)this.streamThread.getName(), (Object)n);
            }
        }
        this.prepareTopic(changelogTopicMetadata);
        log.debug("stream-thread [{}] Created state changelog topics {} from the parsed topology.", (Object)this.streamThread.getName(), changelogTopicMetadata);
        HashMap states = new HashMap();
        for (Map.Entry entry : clientsMetadata.entrySet()) {
            states.put(entry.getKey(), ((ClientMetadata)entry.getValue()).state);
        }
        log.debug("stream-thread [{}] Assigning tasks {} to clients {} with number of replicas {}", new Object[]{this.streamThread.getName(), partitionsForTask.keySet(), states, this.numStandbyReplicas});
        StickyTaskAssignor taskAssignor = new StickyTaskAssignor(states, partitionsForTask.keySet());
        taskAssignor.assign(this.numStandbyReplicas);
        log.info("stream-thread [{}] Assigned tasks to clients as {}.", (Object)this.streamThread.getName(), states);
        this.partitionsByHostState = new HashMap<HostInfo, Set<TopicPartition>>();
        for (Map.Entry entry : clientsMetadata.entrySet()) {
            HostInfo hostInfo = ((ClientMetadata)entry.getValue()).hostInfo;
            if (hostInfo == null) continue;
            HashSet topicPartitions = new HashSet();
            ClientState state = ((ClientMetadata)entry.getValue()).state;
            for (TaskId id : state.activeTasks()) {
                topicPartitions.addAll(partitionsForTask.get(id));
            }
            this.partitionsByHostState.put(hostInfo, topicPartitions);
        }
        HashMap<String, PartitionAssignor.Assignment> hashMap2 = new HashMap<String, PartitionAssignor.Assignment>();
        for (Map.Entry entry : clientsMetadata.entrySet()) {
            Set<String> consumers = ((ClientMetadata)entry.getValue()).consumers;
            ClientState state = ((ClientMetadata)entry.getValue()).state;
            ArrayList<TaskId> taskIds = new ArrayList<TaskId>(state.assignedTaskCount());
            int numActiveTasks = state.activeTaskCount();
            taskIds.addAll(state.activeTasks());
            taskIds.addAll(state.standbyTasks());
            int numConsumers = consumers.size();
            int i = 0;
            for (String consumer : consumers) {
                HashMap<TaskId, Set<TopicPartition>> standby = new HashMap<TaskId, Set<TopicPartition>>();
                ArrayList<AssignedPartition> assignedPartitions = new ArrayList<AssignedPartition>();
                int numTaskIds = taskIds.size();
                for (int j = i; j < numTaskIds; j += numConsumers) {
                    TaskId taskId = (TaskId)taskIds.get(j);
                    if (j < numActiveTasks) {
                        for (TopicPartition topicPartition : partitionsForTask.get(taskId)) {
                            assignedPartitions.add(new AssignedPartition(taskId, topicPartition));
                        }
                        continue;
                    }
                    HashSet standbyPartitions = (HashSet)standby.get(taskId);
                    if (standbyPartitions == null) {
                        standbyPartitions = new HashSet();
                        standby.put(taskId, standbyPartitions);
                    }
                    standbyPartitions.addAll(partitionsForTask.get(taskId));
                }
                Collections.sort(assignedPartitions);
                ArrayList<TaskId> active = new ArrayList<TaskId>();
                ArrayList<TopicPartition> activePartitions = new ArrayList<TopicPartition>();
                for (AssignedPartition assignedPartition : assignedPartitions) {
                    active.add(assignedPartition.taskId);
                    activePartitions.add(assignedPartition.partition);
                }
                hashMap2.put(consumer, new PartitionAssignor.Assignment(activePartitions, new AssignmentInfo(active, standby, this.partitionsByHostState).encode()));
                ++i;
            }
        }
        return hashMap2;
    }

    public void onAssignment(PartitionAssignor.Assignment assignment) {
        ArrayList partitions = new ArrayList(assignment.partitions());
        Collections.sort(partitions, PARTITION_COMPARATOR);
        AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
        this.standbyTasks = info.standbyTasks;
        this.activeTasks = new HashMap<TaskId, Set<TopicPartition>>();
        if (partitions.size() != info.activeTasks.size()) {
            throw new TaskAssignmentException(String.format("stream-thread [%s] Number of assigned partitions %d is not equal to the number of active taskIds %d, assignmentInfo=%s", this.streamThread.getName(), partitions.size(), info.activeTasks.size(), info.toString()));
        }
        for (int i = 0; i < partitions.size(); ++i) {
            TopicPartition partition = (TopicPartition)partitions.get(i);
            TaskId id = info.activeTasks.get(i);
            Set<TopicPartition> assignedPartitions = this.activeTasks.get(id);
            if (assignedPartitions == null) {
                assignedPartitions = new HashSet<TopicPartition>();
                this.activeTasks.put(id, assignedPartitions);
            }
            assignedPartitions.add(partition);
        }
        this.partitionsByHostState = info.partitionsByHost;
        Collection<Set<TopicPartition>> values = this.partitionsByHostState.values();
        HashMap<TopicPartition, PartitionInfo> topicToPartitionInfo = new HashMap<TopicPartition, PartitionInfo>();
        for (Set<TopicPartition> value : values) {
            for (TopicPartition topicPartition : value) {
                topicToPartitionInfo.put(topicPartition, new PartitionInfo(topicPartition.topic(), topicPartition.partition(), null, new Node[0], new Node[0]));
            }
        }
        this.metadataWithInternalTopics = Cluster.empty().withPartitions(topicToPartitionInfo);
        this.checkForNewTopicAssignments(assignment);
    }

    private void checkForNewTopicAssignments(PartitionAssignor.Assignment assignment) {
        if (this.streamThread.builder.sourceTopicPattern() != null) {
            HashSet<String> assignedTopics = new HashSet<String>();
            for (TopicPartition topicPartition : assignment.partitions()) {
                assignedTopics.add(topicPartition.topic());
            }
            if (!this.streamThread.builder.subscriptionUpdates().getUpdates().containsAll(assignedTopics)) {
                assignedTopics.addAll(this.streamThread.builder.subscriptionUpdates().getUpdates());
                this.updateSubscribedTopics(assignedTopics);
            }
        }
    }

    private void prepareTopic(Map<String, InternalTopicMetadata> topicPartitions) {
        log.debug("stream-thread [{}] Starting to validate internal topics in partition assignor.", (Object)this.streamThread.getName());
        HashMap<InternalTopicConfig, Integer> topicsToMakeReady = new HashMap<InternalTopicConfig, Integer>();
        HashSet<String> topicNamesToMakeReady = new HashSet<String>();
        for (InternalTopicMetadata metadata : topicPartitions.values()) {
            InternalTopicConfig topic = metadata.config;
            Integer numPartitions = metadata.numPartitions;
            if (numPartitions == -2) continue;
            if (numPartitions < 0) {
                throw new TopologyBuilderException(String.format("stream-thread [%s] Topic [%s] number of partitions not defined", this.streamThread.getName(), topic.name()));
            }
            topicsToMakeReady.put(topic, numPartitions);
            topicNamesToMakeReady.add(topic.name());
        }
        if (!topicsToMakeReady.isEmpty()) {
            this.internalTopicManager.makeReady(topicsToMakeReady);
            while (!this.allTopicsCreated(topicNamesToMakeReady, topicsToMakeReady)) {
                try {
                    Thread.sleep(50L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }
        log.debug("stream-thread [{}] Completed validating internal topics in partition assignor", (Object)this.streamThread.getName());
    }

    private boolean allTopicsCreated(Set<String> topicNamesToMakeReady, Map<InternalTopicConfig, Integer> topicsToMakeReady) {
        Map<String, Integer> partitions = this.internalTopicManager.getNumPartitions(topicNamesToMakeReady);
        for (Map.Entry<InternalTopicConfig, Integer> entry : topicsToMakeReady.entrySet()) {
            Integer numPartitions = partitions.get(entry.getKey().name());
            if (numPartitions != null && numPartitions.equals(entry.getValue())) continue;
            return false;
        }
        return true;
    }

    private void ensureCopartitioning(Collection<Set<String>> copartitionGroups, Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions, Cluster metadata) {
        for (Set<String> copartitionGroup : copartitionGroups) {
            this.copartitionedTopicsValidator.validate(copartitionGroup, allRepartitionTopicsNumPartitions, metadata);
        }
    }

    Map<HostInfo, Set<TopicPartition>> getPartitionsByHostState() {
        if (this.partitionsByHostState == null) {
            return Collections.emptyMap();
        }
        return Collections.unmodifiableMap(this.partitionsByHostState);
    }

    Cluster clusterMetadata() {
        if (this.metadataWithInternalTopics == null) {
            return Cluster.empty();
        }
        return this.metadataWithInternalTopics;
    }

    Map<TaskId, Set<TopicPartition>> activeTasks() {
        if (this.activeTasks == null) {
            return Collections.emptyMap();
        }
        return Collections.unmodifiableMap(this.activeTasks);
    }

    Map<TaskId, Set<TopicPartition>> standbyTasks() {
        if (this.standbyTasks == null) {
            return Collections.emptyMap();
        }
        return Collections.unmodifiableMap(this.standbyTasks);
    }

    void setInternalTopicManager(InternalTopicManager internalTopicManager) {
        this.internalTopicManager = internalTopicManager;
    }

    public void close() {
        this.internalTopicManager.close();
    }

    static class CopartitionedTopicsValidator {
        private final String threadName;

        CopartitionedTopicsValidator(String threadName) {
            this.threadName = threadName;
        }

        void validate(Set<String> copartitionGroup, Map<String, InternalTopicMetadata> allRepartitionTopicsNumPartitions, Cluster metadata) {
            int numPartitions = -1;
            for (String string : copartitionGroup) {
                if (!allRepartitionTopicsNumPartitions.containsKey(string)) {
                    Integer partitions = metadata.partitionCountForTopic(string);
                    if (partitions == null) {
                        throw new TopologyBuilderException(String.format("stream-thread [%s] Topic not found: %s", this.threadName, string));
                    }
                    if (numPartitions == -1) {
                        numPartitions = partitions;
                        continue;
                    }
                    if (numPartitions == partitions) continue;
                    Object[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]);
                    Arrays.sort(topics);
                    throw new TopologyBuilderException(String.format("stream-thread [%s] Topics not co-partitioned: [%s]", this.threadName, Utils.join(Arrays.asList(topics), (String)",")));
                }
                if (allRepartitionTopicsNumPartitions.get((Object)string).numPartitions != -2) continue;
                numPartitions = -2;
                break;
            }
            if (numPartitions == -1) {
                for (Map.Entry entry : allRepartitionTopicsNumPartitions.entrySet()) {
                    int partitions;
                    if (!copartitionGroup.contains(entry.getKey()) || (partitions = ((InternalTopicMetadata)entry.getValue()).numPartitions) <= numPartitions) continue;
                    numPartitions = partitions;
                }
            }
            for (Map.Entry entry : allRepartitionTopicsNumPartitions.entrySet()) {
                if (!copartitionGroup.contains(entry.getKey())) continue;
                ((InternalTopicMetadata)entry.getValue()).numPartitions = numPartitions;
            }
        }
    }

    public static class SubscriptionUpdates {
        private final Set<String> updatedTopicSubscriptions = new HashSet<String>();

        private void updateTopics(Collection<String> topicNames) {
            this.updatedTopicSubscriptions.clear();
            this.updatedTopicSubscriptions.addAll(topicNames);
        }

        public Collection<String> getUpdates() {
            return Collections.unmodifiableSet(new HashSet<String>(this.updatedTopicSubscriptions));
        }

        public boolean hasUpdates() {
            return !this.updatedTopicSubscriptions.isEmpty();
        }

        public String toString() {
            return "SubscriptionUpdates{updatedTopicSubscriptions=" + this.updatedTopicSubscriptions + '}';
        }
    }

    static class InternalTopicMetadata {
        public final InternalTopicConfig config;
        public int numPartitions;

        InternalTopicMetadata(InternalTopicConfig config) {
            this.config = config;
            this.numPartitions = -1;
        }
    }

    private static class ClientMetadata {
        final HostInfo hostInfo;
        final Set<String> consumers;
        final ClientState state;

        ClientMetadata(String endPoint) {
            if (endPoint != null) {
                String host = Utils.getHost((String)endPoint);
                Integer port = Utils.getPort((String)endPoint);
                if (host == null || port == null) {
                    throw new ConfigException(String.format("Error parsing host address %s. Expected format host:port.", endPoint));
                }
                this.hostInfo = new HostInfo(host, port);
            } else {
                this.hostInfo = null;
            }
            this.consumers = new HashSet<String>();
            this.state = new ClientState();
        }

        void addConsumer(String consumerMemberId, SubscriptionInfo info) {
            this.consumers.add(consumerMemberId);
            this.state.addPreviousActiveTasks(info.prevTasks);
            this.state.addPreviousStandbyTasks(info.standbyTasks);
            this.state.incrementCapacity();
        }

        public String toString() {
            return "ClientMetadata{hostInfo=" + this.hostInfo + ", consumers=" + this.consumers + ", state=" + this.state + '}';
        }
    }

    private static class AssignedPartition
    implements Comparable<AssignedPartition> {
        public final TaskId taskId;
        public final TopicPartition partition;

        AssignedPartition(TaskId taskId, TopicPartition partition) {
            this.taskId = taskId;
            this.partition = partition;
        }

        @Override
        public int compareTo(AssignedPartition that) {
            return PARTITION_COMPARATOR.compare(this.partition, that.partition);
        }

        public boolean equals(Object o) {
            if (!(o instanceof AssignedPartition)) {
                return false;
            }
            AssignedPartition other = (AssignedPartition)o;
            return this.compareTo(other) == 0;
        }

        public int hashCode() {
            return this.partition.hashCode();
        }
    }
}

