/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.producer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.BufferPool;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
import org.apache.kafka.clients.producer.internals.ProducerBatch;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.DefaultRecord;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class RecordAccumulatorTest {
    private String topic = "test";
    private int partition1 = 0;
    private int partition2 = 1;
    private int partition3 = 2;
    private Node node1 = new Node(0, "localhost", 1111);
    private Node node2 = new Node(1, "localhost", 1112);
    private TopicPartition tp1 = new TopicPartition(this.topic, this.partition1);
    private TopicPartition tp2 = new TopicPartition(this.topic, this.partition2);
    private TopicPartition tp3 = new TopicPartition(this.topic, this.partition3);
    private PartitionInfo part1 = new PartitionInfo(this.topic, this.partition1, this.node1, null, null);
    private PartitionInfo part2 = new PartitionInfo(this.topic, this.partition2, this.node1, null, null);
    private PartitionInfo part3 = new PartitionInfo(this.topic, this.partition3, this.node2, null, null);
    private MockTime time = new MockTime();
    private byte[] key = "key".getBytes();
    private byte[] value = "value".getBytes();
    private int msgSize = DefaultRecord.sizeInBytes((int)0, (long)0L, (int)this.key.length, (int)this.value.length, (Header[])Record.EMPTY_HEADERS);
    private Cluster cluster = new Cluster(null, Arrays.asList(this.node1, this.node2), Arrays.asList(this.part1, this.part2, this.part3), Collections.emptySet(), Collections.emptySet());
    private Metrics metrics = new Metrics((Time)this.time);
    private final long maxBlockTimeMs = 1000L;
    private final LogContext logContext = new LogContext();

    @AfterEach
    public void teardown() {
        this.metrics.close();
    }

    @Test
    public void testFull() throws Exception {
        long now = this.time.milliseconds();
        int batchSize = 1025;
        RecordAccumulator accum = this.createTestRecordAccumulator(batchSize + 61, 10L * (long)batchSize, CompressionType.NONE, 10);
        int appends = this.expectedNumAppends(batchSize);
        for (int i = 0; i < appends; ++i) {
            accum.append(this.tp1, 0L, this.key, this.value, Record.EMPTY_HEADERS, null, 1000L, false, this.time.milliseconds());
            Deque partitionBatches = (Deque)accum.batches().get(this.tp1);
            Assertions.assertEquals((int)1, (int)partitionBatches.size());
            ProducerBatch batch = (ProducerBatch)partitionBatches.peekFirst();
            Assertions.assertTrue((boolean)batch.isWritable());
            Assertions.assertEquals((int)0, (int)accum.ready((Cluster)this.cluster, (long)now).readyNodes.size(), (String)"No partitions should be ready.");
        }
        accum.append(this.tp1, 0L, this.key, this.value, Record.EMPTY_HEADERS, null, 1000L, false, this.time.milliseconds());
        Deque partitionBatches = (Deque)accum.batches().get(this.tp1);
        Assertions.assertEquals((int)2, (int)partitionBatches.size());
        Iterator partitionBatchesIterator = partitionBatches.iterator();
        Assertions.assertTrue((boolean)((ProducerBatch)partitionBatchesIterator.next()).isWritable());
        Assertions.assertEquals(Collections.singleton(this.node1), (Object)accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes, (String)"Our partition's leader should be ready");
        List batches = (List)accum.drain(this.cluster, Collections.singleton(this.node1), Integer.MAX_VALUE, 0L).get(this.node1.id());
        Assertions.assertEquals((int)1, (int)batches.size());
        ProducerBatch batch = (ProducerBatch)batches.get(0);
        Iterator iter = batch.records().records().iterator();
        for (int i = 0; i < appends; ++i) {
            Record record = (Record)iter.next();
            Assertions.assertEquals((Object)ByteBuffer.wrap(this.key), (Object)record.key(), (String)"Keys should match");
            Assertions.assertEquals((Object)ByteBuffer.wrap(this.value), (Object)record.value(), (String)"Values should match");
        }
        Assertions.assertFalse((boolean)iter.hasNext(), (String)"No more records");
    }

    @Test
    public void testAppendLargeCompressed() throws Exception {
        this.testAppendLarge(CompressionType.GZIP);
    }

    @Test
    public void testAppendLargeNonCompressed() throws Exception {
        this.testAppendLarge(CompressionType.NONE);
    }

    private void testAppendLarge(CompressionType compressionType) throws Exception {
        int batchSize = 512;
        byte[] value = new byte[2 * batchSize];
        RecordAccumulator accum = this.createTestRecordAccumulator(batchSize + 61, 10240L, compressionType, 0);
        accum.append(this.tp1, 0L, this.key, value, Record.EMPTY_HEADERS, null, 1000L, false, this.time.milliseconds());
        Assertions.assertEquals(Collections.singleton(this.node1), (Object)accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes, (String)"Our partition's leader should be ready");
        Deque batches = (Deque)accum.batches().get(this.tp1);
        Assertions.assertEquals((int)1, (int)batches.size());
        ProducerBatch producerBatch = (ProducerBatch)batches.peek();
        List recordBatches = TestUtils.toList(producerBatch.records().batches());
        Assertions.assertEquals((int)1, (int)recordBatches.size());
        MutableRecordBatch recordBatch = (MutableRecordBatch)recordBatches.get(0);
        Assertions.assertEquals((long)0L, (long)recordBatch.baseOffset());
        List records = TestUtils.toList(recordBatch);
        Assertions.assertEquals((int)1, (int)records.size());
        Record record = (Record)records.get(0);
        Assertions.assertEquals((long)0L, (long)record.offset());
        Assertions.assertEquals((Object)ByteBuffer.wrap(this.key), (Object)record.key());
        Assertions.assertEquals((Object)ByteBuffer.wrap(value), (Object)record.value());
        Assertions.assertEquals((long)0L, (long)record.timestamp());
    }

    @Test
    public void testAppendLargeOldMessageFormatCompressed() throws Exception {
        this.testAppendLargeOldMessageFormat(CompressionType.GZIP);
    }

    @Test
    public void testAppendLargeOldMessageFormatNonCompressed() throws Exception {
        this.testAppendLargeOldMessageFormat(CompressionType.NONE);
    }

    private void testAppendLargeOldMessageFormat(CompressionType compressionType) throws Exception {
        int batchSize = 512;
        byte[] value = new byte[2 * batchSize];
        ApiVersions apiVersions = new ApiVersions();
        apiVersions.update(this.node1.idString(), NodeApiVersions.create((short)ApiKeys.PRODUCE.id, (short)0, (short)2));
        RecordAccumulator accum = this.createTestRecordAccumulator(batchSize + 61, 10240L, compressionType, 0);
        accum.append(this.tp1, 0L, this.key, value, Record.EMPTY_HEADERS, null, 1000L, false, this.time.milliseconds());
        Assertions.assertEquals(Collections.singleton(this.node1), (Object)accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes, (String)"Our partition's leader should be ready");
        Deque batches = (Deque)accum.batches().get(this.tp1);
        Assertions.assertEquals((int)1, (int)batches.size());
        ProducerBatch producerBatch = (ProducerBatch)batches.peek();
        List recordBatches = TestUtils.toList(producerBatch.records().batches());
        Assertions.assertEquals((int)1, (int)recordBatches.size());
        MutableRecordBatch recordBatch = (MutableRecordBatch)recordBatches.get(0);
        Assertions.assertEquals((long)0L, (long)recordBatch.baseOffset());
        List records = TestUtils.toList(recordBatch);
        Assertions.assertEquals((int)1, (int)records.size());
        Record record = (Record)records.get(0);
        Assertions.assertEquals((long)0L, (long)record.offset());
        Assertions.assertEquals((Object)ByteBuffer.wrap(this.key), (Object)record.key());
        Assertions.assertEquals((Object)ByteBuffer.wrap(value), (Object)record.value());
        Assertions.assertEquals((long)0L, (long)record.timestamp());
    }

    @Test
    public void testLinger() throws Exception {
        int lingerMs = 10;
        RecordAccumulator accum = this.createTestRecordAccumulator(1085, 10240L, CompressionType.NONE, lingerMs);
        accum.append(this.tp1, 0L, this.key, this.value, Record.EMPTY_HEADERS, null, 1000L, false, this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes.size(), (String)"No partitions should be ready");
        this.time.sleep(10L);
        Assertions.assertEquals(Collections.singleton(this.node1), (Object)accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes, (String)"Our partition's leader should be ready");
        List batches = (List)accum.drain(this.cluster, Collections.singleton(this.node1), Integer.MAX_VALUE, 0L).get(this.node1.id());
        Assertions.assertEquals((int)1, (int)batches.size());
        ProducerBatch batch = (ProducerBatch)batches.get(0);
        Iterator iter = batch.records().records().iterator();
        Record record = (Record)iter.next();
        Assertions.assertEquals((Object)ByteBuffer.wrap(this.key), (Object)record.key(), (String)"Keys should match");
        Assertions.assertEquals((Object)ByteBuffer.wrap(this.value), (Object)record.value(), (String)"Values should match");
        Assertions.assertFalse((boolean)iter.hasNext(), (String)"No more records");
    }

    @Test
    public void testPartialDrain() throws Exception {
        RecordAccumulator accum = this.createTestRecordAccumulator(1085, 10240L, CompressionType.NONE, 10);
        int appends = 1024 / this.msgSize + 1;
        List<TopicPartition> partitions = Arrays.asList(this.tp1, this.tp2);
        for (TopicPartition tp : partitions) {
            for (int i = 0; i < appends; ++i) {
                accum.append(tp, 0L, this.key, this.value, Record.EMPTY_HEADERS, null, 1000L, false, this.time.milliseconds());
            }
        }
        Assertions.assertEquals(Collections.singleton(this.node1), (Object)accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes, (String)"Partition's leader should be ready");
        List batches = (List)accum.drain(this.cluster, Collections.singleton(this.node1), 1024, 0L).get(this.node1.id());
        Assertions.assertEquals((int)1, (int)batches.size(), (String)"But due to size bound only one partition should have been retrieved");
    }

    @Test
    public void testStressfulSituation() throws Exception {
        int numThreads = 5;
        int msgs = 10000;
        int numParts = 2;
        final RecordAccumulator accum = this.createTestRecordAccumulator(1085, 10240L, CompressionType.NONE, 0);
        ArrayList<1> threads = new ArrayList<1>();
        for (int i = 0; i < 5; ++i) {
            threads.add(new Thread(){

                @Override
                public void run() {
                    for (int i = 0; i < 10000; ++i) {
                        try {
                            accum.append(new TopicPartition(RecordAccumulatorTest.this.topic, i % 2), 0L, RecordAccumulatorTest.this.key, RecordAccumulatorTest.this.value, Record.EMPTY_HEADERS, null, 1000L, false, RecordAccumulatorTest.this.time.milliseconds());
                            continue;
                        }
                        catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }
        for (Thread thread : threads) {
            thread.start();
        }
        int read = 0;
        long l = this.time.milliseconds();
        while (read < 50000) {
            Set nodes = accum.ready((Cluster)this.cluster, (long)l).readyNodes;
            List list = (List)accum.drain(this.cluster, nodes, 5120, 0L).get(this.node1.id());
            if (list == null) continue;
            for (ProducerBatch batch : list) {
                for (Record record : batch.records().records()) {
                    ++read;
                }
                accum.deallocate(batch);
            }
        }
        for (Thread thread : threads) {
            thread.join();
        }
    }

    @Test
    public void testNextReadyCheckDelay() throws Exception {
        int i;
        int lingerMs = 10;
        int batchSize = 1025;
        RecordAccumulator accum = this.createTestRecordAccumulator(batchSize + 61, 10 * batchSize, CompressionType.NONE, lingerMs);
        int appends = this.expectedNumAppends(batchSize);
        for (int i2 = 0; i2 < appends; ++i2) {
            accum.append(this.tp1, 0L, this.key, this.value, Record.EMPTY_HEADERS, null, 1000L, false, this.time.milliseconds());
        }
        RecordAccumulator.ReadyCheckResult result = accum.ready(this.cluster, this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)result.readyNodes.size(), (String)"No nodes should be ready.");
        Assertions.assertEquals((long)lingerMs, (long)result.nextReadyCheckDelayMs, (String)"Next check time should be the linger time");
        this.time.sleep(lingerMs / 2);
        for (i = 0; i < appends; ++i) {
            accum.append(this.tp3, 0L, this.key, this.value, Record.EMPTY_HEADERS, null, 1000L, false, this.time.milliseconds());
        }
        result = accum.ready(this.cluster, this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)result.readyNodes.size(), (String)"No nodes should be ready.");
        Assertions.assertEquals((long)(lingerMs / 2), (long)result.nextReadyCheckDelayMs, (String)"Next check time should be defined by node1, half remaining linger time");
        for (i = 0; i < appends + 1; ++i) {
            accum.append(this.tp2, 0L, this.key, this.value, Record.EMPTY_HEADERS, null, 1000L, false, this.time.milliseconds());
        }
        result = accum.ready(this.cluster, this.time.milliseconds());
        Assertions.assertEquals(Collections.singleton(this.node1), (Object)result.readyNodes, (String)"Node1 should be ready");
        Assertions.assertTrue((result.nextReadyCheckDelayMs <= (long)lingerMs ? 1 : 0) != 0, (String)"Next check time should be defined by node2, at most linger time");
    }

    @Test
    public void testRetryBackoff() throws Exception {
        int lingerMs = 0x7FFFFFF;
        long retryBackoffMs = 0xFFFFFFFL;
        int deliveryTimeoutMs = Integer.MAX_VALUE;
        long totalSize = 10240L;
        int batchSize = 1085;
        String metricGrpName = "producer-metrics";
        RecordAccumulator accum = new RecordAccumulator(this.logContext, batchSize, CompressionType.NONE, lingerMs, retryBackoffMs, deliveryTimeoutMs, this.metrics, metricGrpName, (Time)this.time, new ApiVersions(), null, new BufferPool(totalSize, batchSize, this.metrics, (Time)this.time, metricGrpName));
        long now = this.time.milliseconds();
        accum.append(this.tp1, 0L, this.key, this.value, Record.EMPTY_HEADERS, null, 1000L, false, this.time.milliseconds());
        RecordAccumulator.ReadyCheckResult result = accum.ready(this.cluster, now + (long)lingerMs + 1L);
        Assertions.assertEquals(Collections.singleton(this.node1), (Object)result.readyNodes, (String)"Node1 should be ready");
        Map batches = accum.drain(this.cluster, result.readyNodes, Integer.MAX_VALUE, now + (long)lingerMs + 1L);
        Assertions.assertEquals((int)1, (int)batches.size(), (String)"Node1 should be the only ready node.");
        Assertions.assertEquals((int)1, (int)((List)batches.get(0)).size(), (String)"Partition 0 should only have one batch drained.");
        now = this.time.milliseconds();
        accum.reenqueue((ProducerBatch)((List)batches.get(0)).get(0), now);
        accum.append(this.tp2, 0L, this.key, this.value, Record.EMPTY_HEADERS, null, 1000L, false, this.time.milliseconds());
        result = accum.ready(this.cluster, now + (long)lingerMs + 1L);
        Assertions.assertEquals(Collections.singleton(this.node1), (Object)result.readyNodes, (String)"Node1 should be ready");
        batches = accum.drain(this.cluster, result.readyNodes, Integer.MAX_VALUE, now + (long)lingerMs + 1L);
        Assertions.assertEquals((int)1, (int)batches.size(), (String)"Node1 should be the only ready node.");
        Assertions.assertEquals((int)1, (int)((List)batches.get(0)).size(), (String)"Node1 should only have one batch drained.");
        Assertions.assertEquals((Object)this.tp2, (Object)((ProducerBatch)((List)batches.get((Object)Integer.valueOf((int)0))).get((int)0)).topicPartition, (String)"Node1 should only have one batch for partition 1.");
        result = accum.ready(this.cluster, now + retryBackoffMs + 1L);
        Assertions.assertEquals(Collections.singleton(this.node1), (Object)result.readyNodes, (String)"Node1 should be ready");
        batches = accum.drain(this.cluster, result.readyNodes, Integer.MAX_VALUE, now + retryBackoffMs + 1L);
        Assertions.assertEquals((int)1, (int)batches.size(), (String)"Node1 should be the only ready node.");
        Assertions.assertEquals((int)1, (int)((List)batches.get(0)).size(), (String)"Node1 should only have one batch drained.");
        Assertions.assertEquals((Object)this.tp1, (Object)((ProducerBatch)((List)batches.get((Object)Integer.valueOf((int)0))).get((int)0)).topicPartition, (String)"Node1 should only have one batch for partition 0.");
    }

    @Test
    public void testFlush() throws Exception {
        int lingerMs = Integer.MAX_VALUE;
        RecordAccumulator accum = this.createTestRecordAccumulator(4157, 65536L, CompressionType.NONE, lingerMs);
        for (int i = 0; i < 100; ++i) {
            accum.append(new TopicPartition(this.topic, i % 3), 0L, this.key, this.value, Record.EMPTY_HEADERS, null, 1000L, false, this.time.milliseconds());
            Assertions.assertTrue((boolean)accum.hasIncomplete());
        }
        RecordAccumulator.ReadyCheckResult result = accum.ready(this.cluster, this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)result.readyNodes.size(), (String)"No nodes should be ready.");
        accum.beginFlush();
        result = accum.ready(this.cluster, this.time.milliseconds());
        Map results = accum.drain(this.cluster, result.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assertions.assertTrue((boolean)accum.hasIncomplete());
        for (List batches : results.values()) {
            for (ProducerBatch batch : batches) {
                accum.deallocate(batch);
            }
        }
        accum.awaitFlushCompletion();
        Assertions.assertFalse((boolean)accum.hasUndrained());
        Assertions.assertFalse((boolean)accum.hasIncomplete());
    }

    private void delayedInterrupt(final Thread thread, final long delayMs) {
        Thread t = new Thread(){

            @Override
            public void run() {
                Time.SYSTEM.sleep(delayMs);
                thread.interrupt();
            }
        };
        t.start();
    }

    @Test
    public void testAwaitFlushComplete() throws Exception {
        RecordAccumulator accum = this.createTestRecordAccumulator(4157, 65536L, CompressionType.NONE, Integer.MAX_VALUE);
        accum.append(new TopicPartition(this.topic, 0), 0L, this.key, this.value, Record.EMPTY_HEADERS, null, 1000L, false, this.time.milliseconds());
        accum.beginFlush();
        Assertions.assertTrue((boolean)accum.flushInProgress());
        this.delayedInterrupt(Thread.currentThread(), 1000L);
        try {
            accum.awaitFlushCompletion();
            Assertions.fail((String)"awaitFlushCompletion should throw InterruptException");
        }
        catch (InterruptedException e) {
            Assertions.assertFalse((boolean)accum.flushInProgress(), (String)"flushInProgress count should be decremented even if thread is interrupted");
        }
    }

    @Test
    public void testAbortIncompleteBatches() throws Exception {
        int lingerMs = Integer.MAX_VALUE;
        int numRecords = 100;
        final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0);
        RecordAccumulator accum = this.createTestRecordAccumulator(189, 65536L, CompressionType.NONE, lingerMs);
        for (int i = 0; i < numRecords; ++i) {
            class TestCallback
            implements Callback {
                TestCallback() {
                }

                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    Assertions.assertTrue((boolean)exception.getMessage().equals("Producer is closed forcefully."));
                    numExceptionReceivedInCallback.incrementAndGet();
                }
            }
            accum.append(new TopicPartition(this.topic, i % 3), 0L, this.key, this.value, null, (Callback)new TestCallback(), 1000L, false, this.time.milliseconds());
        }
        RecordAccumulator.ReadyCheckResult result = accum.ready(this.cluster, this.time.milliseconds());
        Assertions.assertFalse((boolean)result.readyNodes.isEmpty());
        Map drained = accum.drain(this.cluster, result.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assertions.assertTrue((boolean)accum.hasUndrained());
        Assertions.assertTrue((boolean)accum.hasIncomplete());
        int numDrainedRecords = 0;
        for (Map.Entry drainedEntry : drained.entrySet()) {
            for (ProducerBatch batch : (List)drainedEntry.getValue()) {
                Assertions.assertTrue((boolean)batch.isClosed());
                Assertions.assertFalse((boolean)batch.produceFuture.completed());
                numDrainedRecords += batch.recordCount;
            }
        }
        Assertions.assertTrue((numDrainedRecords > 0 && numDrainedRecords < numRecords ? 1 : 0) != 0);
        accum.abortIncompleteBatches();
        Assertions.assertEquals((int)numRecords, (int)numExceptionReceivedInCallback.get());
        Assertions.assertFalse((boolean)accum.hasUndrained());
        Assertions.assertFalse((boolean)accum.hasIncomplete());
    }

    @Test
    public void testAbortUnsentBatches() throws Exception {
        int lingerMs = Integer.MAX_VALUE;
        int numRecords = 100;
        final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0);
        RecordAccumulator accum = this.createTestRecordAccumulator(189, 65536L, CompressionType.NONE, lingerMs);
        final KafkaException cause = new KafkaException();
        for (int i = 0; i < numRecords; ++i) {
            class TestCallback
            implements Callback {
                TestCallback() {
                }

                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    Assertions.assertEquals((Object)((Object)cause), (Object)exception);
                    numExceptionReceivedInCallback.incrementAndGet();
                }
            }
            accum.append(new TopicPartition(this.topic, i % 3), 0L, this.key, this.value, null, (Callback)new TestCallback(), 1000L, false, this.time.milliseconds());
        }
        RecordAccumulator.ReadyCheckResult result = accum.ready(this.cluster, this.time.milliseconds());
        Assertions.assertFalse((boolean)result.readyNodes.isEmpty());
        Map drained = accum.drain(this.cluster, result.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assertions.assertTrue((boolean)accum.hasUndrained());
        Assertions.assertTrue((boolean)accum.hasIncomplete());
        accum.abortUndrainedBatches((RuntimeException)((Object)cause));
        int numDrainedRecords = 0;
        for (Map.Entry drainedEntry : drained.entrySet()) {
            for (ProducerBatch batch : (List)drainedEntry.getValue()) {
                Assertions.assertTrue((boolean)batch.isClosed());
                Assertions.assertFalse((boolean)batch.produceFuture.completed());
                numDrainedRecords += batch.recordCount;
            }
        }
        Assertions.assertTrue((numDrainedRecords > 0 ? 1 : 0) != 0);
        Assertions.assertTrue((numExceptionReceivedInCallback.get() > 0 ? 1 : 0) != 0);
        Assertions.assertEquals((int)numRecords, (int)(numExceptionReceivedInCallback.get() + numDrainedRecords));
        Assertions.assertFalse((boolean)accum.hasUndrained());
        Assertions.assertTrue((boolean)accum.hasIncomplete());
    }

    private void doExpireBatchSingle(int deliveryTimeoutMs) throws InterruptedException {
        int lingerMs = 300;
        List<Boolean> muteStates = Arrays.asList(false, true);
        Set readyNodes = null;
        List expiredBatches = new ArrayList();
        int batchSize = 1025;
        RecordAccumulator accum = this.createTestRecordAccumulator(deliveryTimeoutMs, batchSize + 61, 10 * batchSize, CompressionType.NONE, lingerMs);
        for (Boolean mute : muteStates) {
            if (this.time.milliseconds() < System.currentTimeMillis()) {
                this.time.setCurrentTimeMs(System.currentTimeMillis());
            }
            accum.append(this.tp1, 0L, this.key, this.value, Record.EMPTY_HEADERS, null, 1000L, false, this.time.milliseconds());
            Assertions.assertEquals((int)0, (int)accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes.size(), (String)"No partition should be ready.");
            this.time.sleep(lingerMs);
            readyNodes = accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes;
            Assertions.assertEquals(Collections.singleton(this.node1), (Object)readyNodes, (String)"Our partition's leader should be ready");
            expiredBatches = accum.expiredBatches(this.time.milliseconds());
            Assertions.assertEquals((int)0, (int)expiredBatches.size(), (String)"The batch should not expire when just linger has passed");
            if (mute.booleanValue()) {
                accum.mutePartition(this.tp1);
            } else {
                accum.unmutePartition(this.tp1);
            }
            this.time.sleep(deliveryTimeoutMs - lingerMs);
            expiredBatches = accum.expiredBatches(this.time.milliseconds());
            Assertions.assertEquals((int)1, (int)expiredBatches.size(), (String)"The batch may expire when the partition is muted");
            Assertions.assertEquals((int)0, (int)accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes.size(), (String)"No partitions should be ready.");
        }
    }

    @Test
    public void testExpiredBatchSingle() throws InterruptedException {
        this.doExpireBatchSingle(3200);
    }

    @Test
    public void testExpiredBatchSingleMaxValue() throws InterruptedException {
        this.doExpireBatchSingle(Integer.MAX_VALUE);
    }

    @Test
    public void testExpiredBatches() throws InterruptedException {
        long retryBackoffMs = 100L;
        int lingerMs = 30;
        int requestTimeout = 60;
        int deliveryTimeoutMs = 3200;
        int batchSize = 1025;
        RecordAccumulator accum = this.createTestRecordAccumulator(deliveryTimeoutMs, batchSize + 61, 10 * batchSize, CompressionType.NONE, lingerMs);
        int appends = this.expectedNumAppends(batchSize);
        for (int i = 0; i < appends; ++i) {
            accum.append(this.tp1, 0L, this.key, this.value, Record.EMPTY_HEADERS, null, 1000L, false, this.time.milliseconds());
            Assertions.assertEquals((int)0, (int)accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes.size(), (String)"No partitions should be ready.");
        }
        accum.append(this.tp1, 0L, this.key, this.value, Record.EMPTY_HEADERS, null, 0L, false, this.time.milliseconds());
        Set readyNodes = accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes;
        Assertions.assertEquals(Collections.singleton(this.node1), (Object)readyNodes, (String)"Our partition's leader should be ready");
        this.time.sleep(deliveryTimeoutMs + 1);
        accum.mutePartition(this.tp1);
        List expiredBatches = accum.expiredBatches(this.time.milliseconds());
        Assertions.assertEquals((int)2, (int)expiredBatches.size(), (String)"The batches will be muted no matter if the partition is muted or not");
        accum.unmutePartition(this.tp1);
        expiredBatches = accum.expiredBatches(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)expiredBatches.size(), (String)"All batches should have been expired earlier");
        Assertions.assertEquals((int)0, (int)accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes.size(), (String)"No partitions should be ready.");
        this.time.sleep(lingerMs);
        Assertions.assertEquals(Collections.singleton(this.node1), (Object)readyNodes, (String)"Our partition's leader should be ready");
        this.time.sleep(requestTimeout + 1);
        accum.mutePartition(this.tp1);
        expiredBatches = accum.expiredBatches(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)expiredBatches.size(), (String)"The batch should not be expired when metadata is still available and partition is muted");
        accum.unmutePartition(this.tp1);
        expiredBatches = accum.expiredBatches(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)expiredBatches.size(), (String)"All batches should have been expired");
        Assertions.assertEquals((int)0, (int)accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes.size(), (String)"No partitions should be ready.");
        accum.append(this.tp1, 0L, this.key, this.value, Record.EMPTY_HEADERS, null, 0L, false, this.time.milliseconds());
        this.time.sleep(lingerMs);
        readyNodes = accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes;
        Assertions.assertEquals(Collections.singleton(this.node1), (Object)readyNodes, (String)"Our partition's leader should be ready");
        Map drained = accum.drain(this.cluster, readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assertions.assertEquals((int)((List)drained.get(this.node1.id())).size(), (int)1, (String)"There should be only one batch.");
        this.time.sleep(1000L);
        accum.reenqueue((ProducerBatch)((List)drained.get(this.node1.id())).get(0), this.time.milliseconds());
        this.time.sleep((long)requestTimeout + retryBackoffMs);
        expiredBatches = accum.expiredBatches(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)expiredBatches.size(), (String)"The batch should not be expired.");
        this.time.sleep(1L);
        accum.mutePartition(this.tp1);
        expiredBatches = accum.expiredBatches(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)expiredBatches.size(), (String)"The batch should not be expired when the partition is muted");
        accum.unmutePartition(this.tp1);
        expiredBatches = accum.expiredBatches(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)expiredBatches.size(), (String)"All batches should have been expired.");
        accum.append(this.tp1, 0L, this.key, this.value, Record.EMPTY_HEADERS, null, 0L, false, this.time.milliseconds());
        this.time.sleep(lingerMs);
        readyNodes = accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes;
        Assertions.assertEquals(Collections.singleton(this.node1), (Object)readyNodes, (String)"Our partition's leader should be ready");
        this.time.sleep(requestTimeout + 1);
        accum.mutePartition(this.tp1);
        expiredBatches = accum.expiredBatches(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)expiredBatches.size(), (String)"The batch should not be expired when the partition is muted");
        long throttleTimeMs = 100L;
        accum.unmutePartition(this.tp1);
        expiredBatches = accum.expiredBatches(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)expiredBatches.size(), (String)"The batch should not be expired when the partition is muted");
        this.time.sleep(throttleTimeMs);
        expiredBatches = accum.expiredBatches(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)expiredBatches.size(), (String)"All batches should have been expired earlier");
        Assertions.assertEquals((int)1, (int)accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes.size(), (String)"No partitions should be ready.");
    }

    @Test
    public void testMutedPartitions() throws InterruptedException {
        long now = this.time.milliseconds();
        int batchSize = 1025;
        RecordAccumulator accum = this.createTestRecordAccumulator(batchSize + 61, 10 * batchSize, CompressionType.NONE, 10);
        int appends = this.expectedNumAppends(batchSize);
        for (int i = 0; i < appends; ++i) {
            accum.append(this.tp1, 0L, this.key, this.value, Record.EMPTY_HEADERS, null, 1000L, false, this.time.milliseconds());
            Assertions.assertEquals((int)0, (int)accum.ready((Cluster)this.cluster, (long)now).readyNodes.size(), (String)"No partitions should be ready.");
        }
        this.time.sleep(2000L);
        accum.mutePartition(this.tp1);
        RecordAccumulator.ReadyCheckResult result = accum.ready(this.cluster, this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)result.readyNodes.size(), (String)"No node should be ready");
        accum.unmutePartition(this.tp1);
        result = accum.ready(this.cluster, this.time.milliseconds());
        Assertions.assertTrue((result.readyNodes.size() > 0 ? 1 : 0) != 0, (String)"The batch should be ready");
        accum.mutePartition(this.tp1);
        Map drained = accum.drain(this.cluster, result.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)((List)drained.get(this.node1.id())).size(), (String)"No batch should have been drained");
        accum.unmutePartition(this.tp1);
        drained = accum.drain(this.cluster, result.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assertions.assertTrue((((List)drained.get(this.node1.id())).size() > 0 ? 1 : 0) != 0, (String)"The batch should have been drained.");
    }

    @Test
    public void testIdempotenceWithOldMagic() {
        ApiVersions apiVersions = new ApiVersions();
        int batchSize = 1025;
        int deliveryTimeoutMs = 3200;
        int lingerMs = 10;
        long retryBackoffMs = 100L;
        long totalSize = 10 * batchSize;
        String metricGrpName = "producer-metrics";
        apiVersions.update("foobar", NodeApiVersions.create((short)ApiKeys.PRODUCE.id, (short)0, (short)2));
        TransactionManager transactionManager = new TransactionManager(new LogContext(), null, 0, retryBackoffMs, apiVersions);
        RecordAccumulator accum = new RecordAccumulator(this.logContext, batchSize + 61, CompressionType.NONE, lingerMs, retryBackoffMs, deliveryTimeoutMs, this.metrics, metricGrpName, (Time)this.time, apiVersions, transactionManager, new BufferPool(totalSize, batchSize, this.metrics, (Time)this.time, metricGrpName));
        Assertions.assertThrows(UnsupportedVersionException.class, () -> accum.append(this.tp1, 0L, this.key, this.value, Record.EMPTY_HEADERS, null, 0L, false, this.time.milliseconds()));
    }

    @Test
    public void testRecordsDrainedWhenTransactionCompleting() throws Exception {
        int batchSize = 1025;
        int deliveryTimeoutMs = 3200;
        int lingerMs = 10;
        long totalSize = 10 * batchSize;
        TransactionManager transactionManager = (TransactionManager)Mockito.mock(TransactionManager.class);
        RecordAccumulator accumulator = this.createTestRecordAccumulator(transactionManager, deliveryTimeoutMs, batchSize, totalSize, CompressionType.NONE, lingerMs);
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(12345L, 5);
        Mockito.when((Object)transactionManager.producerIdAndEpoch()).thenReturn((Object)producerIdAndEpoch);
        Mockito.when((Object)transactionManager.isSendToPartitionAllowed(this.tp1)).thenReturn((Object)true);
        Mockito.when((Object)transactionManager.isPartitionAdded(this.tp1)).thenReturn((Object)true);
        Mockito.when((Object)transactionManager.firstInFlightSequence(this.tp1)).thenReturn((Object)0);
        Mockito.when((Object)transactionManager.isCompleting()).thenReturn((Object)false);
        accumulator.append(this.tp1, 0L, this.key, this.value, Record.EMPTY_HEADERS, null, 1000L, false, this.time.milliseconds());
        accumulator.append(this.tp1, 0L, this.key, this.value, Record.EMPTY_HEADERS, null, 1000L, false, this.time.milliseconds());
        Assertions.assertTrue((boolean)accumulator.hasUndrained());
        RecordAccumulator.ReadyCheckResult firstResult = accumulator.ready(this.cluster, this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)firstResult.readyNodes.size());
        Map firstDrained = accumulator.drain(this.cluster, firstResult.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)firstDrained.size());
        Mockito.when((Object)transactionManager.isCompleting()).thenReturn((Object)true);
        RecordAccumulator.ReadyCheckResult secondResult = accumulator.ready(this.cluster, this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)secondResult.readyNodes.size());
        Node readyNode = (Node)secondResult.readyNodes.iterator().next();
        Map secondDrained = accumulator.drain(this.cluster, secondResult.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assertions.assertEquals(Collections.singleton(readyNode.id()), secondDrained.keySet());
        List batches = (List)secondDrained.get(readyNode.id());
        Assertions.assertEquals((int)1, (int)batches.size());
    }

    @Test
    public void testSplitAndReenqueue() throws ExecutionException, InterruptedException {
        long now = this.time.milliseconds();
        RecordAccumulator accum = this.createTestRecordAccumulator(1024, 10240L, CompressionType.GZIP, 10);
        ByteBuffer buffer = ByteBuffer.allocate(4096);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        ProducerBatch batch = new ProducerBatch(this.tp1, builder, now, true);
        byte[] value = new byte[1024];
        final AtomicInteger acked = new AtomicInteger(0);
        Callback cb = new Callback(){

            public void onCompletion(RecordMetadata metadata, Exception exception) {
                acked.incrementAndGet();
            }
        };
        FutureRecordMetadata future1 = batch.tryAppend(now, null, value, Record.EMPTY_HEADERS, cb, now);
        FutureRecordMetadata future2 = batch.tryAppend(now, null, value, Record.EMPTY_HEADERS, cb, now);
        Assertions.assertNotNull((Object)future1);
        Assertions.assertNotNull((Object)future2);
        batch.close();
        accum.reenqueue(batch, now);
        this.time.sleep(101L);
        RecordAccumulator.ReadyCheckResult result = accum.ready(this.cluster, this.time.milliseconds());
        Assertions.assertTrue((result.readyNodes.size() > 0 ? 1 : 0) != 0, (String)"The batch should be ready");
        Map drained = accum.drain(this.cluster, result.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)drained.size(), (String)"Only node1 should be drained");
        Assertions.assertEquals((int)1, (int)((List)drained.get(this.node1.id())).size(), (String)"Only one batch should be drained");
        accum.splitAndReenqueue((ProducerBatch)((List)drained.get(this.node1.id())).get(0));
        this.time.sleep(101L);
        drained = accum.drain(this.cluster, result.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assertions.assertFalse((boolean)drained.isEmpty());
        Assertions.assertFalse((boolean)((List)drained.get(this.node1.id())).isEmpty());
        ((ProducerBatch)((List)drained.get(this.node1.id())).get(0)).complete((long)acked.get(), 100L);
        Assertions.assertEquals((int)1, (int)acked.get(), (String)"The first message should have been acked.");
        Assertions.assertTrue((boolean)future1.isDone());
        Assertions.assertEquals((long)0L, (long)((RecordMetadata)future1.get()).offset());
        drained = accum.drain(this.cluster, result.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assertions.assertFalse((boolean)drained.isEmpty());
        Assertions.assertFalse((boolean)((List)drained.get(this.node1.id())).isEmpty());
        ((ProducerBatch)((List)drained.get(this.node1.id())).get(0)).complete((long)acked.get(), 100L);
        Assertions.assertEquals((int)2, (int)acked.get(), (String)"Both message should have been acked.");
        Assertions.assertTrue((boolean)future2.isDone());
        Assertions.assertEquals((long)1L, (long)((RecordMetadata)future2.get()).offset());
    }

    @Test
    public void testSplitBatchOffAccumulator() throws InterruptedException {
        long seed = System.currentTimeMillis();
        int batchSize = 1024;
        int bufferCapacity = 3072;
        CompressionRatioEstimator.setEstimation((String)this.tp1.topic(), (CompressionType)CompressionType.GZIP, (float)0.1f);
        RecordAccumulator accum = this.createTestRecordAccumulator(1024, 3072L, CompressionType.GZIP, 0);
        int numSplitBatches = this.prepareSplitBatches(accum, seed, 100, 20);
        Assertions.assertTrue((numSplitBatches > 0 ? 1 : 0) != 0, (String)"There should be some split batches");
        RecordAccumulator.ReadyCheckResult result = accum.ready(this.cluster, this.time.milliseconds());
        for (int i = 0; i < numSplitBatches; ++i) {
            Map drained = accum.drain(this.cluster, result.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
            Assertions.assertFalse((boolean)drained.isEmpty());
            Assertions.assertFalse((boolean)((List)drained.get(this.node1.id())).isEmpty());
        }
        Assertions.assertTrue((boolean)accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes.isEmpty(), (String)"All the batches should have been drained.");
        Assertions.assertEquals((long)3072L, (long)accum.bufferPoolAvailableMemory(), (String)"The split batches should be allocated off the accumulator");
    }

    @Test
    public void testSplitFrequency() throws InterruptedException {
        long seed = System.currentTimeMillis();
        Random random = new Random();
        random.setSeed(seed);
        int batchSize = 1024;
        int numMessages = 1000;
        RecordAccumulator accum = this.createTestRecordAccumulator(1024, 3072L, CompressionType.GZIP, 10);
        for (int goodCompRatioPercentage = 1; goodCompRatioPercentage < 100; ++goodCompRatioPercentage) {
            int numSplit = 0;
            int numBatches = 0;
            CompressionRatioEstimator.resetEstimation((String)this.topic);
            for (int i = 0; i < 1000; ++i) {
                int dice = random.nextInt(100);
                byte[] value = dice < goodCompRatioPercentage ? this.bytesWithGoodCompression(random) : this.bytesWithPoorCompression(random, 100);
                accum.append(this.tp1, 0L, null, value, Record.EMPTY_HEADERS, null, 0L, false, this.time.milliseconds());
                BatchDrainedResult result = this.completeOrSplitBatches(accum, 1024);
                numSplit += result.numSplit;
                numBatches += result.numBatches;
            }
            this.time.sleep(10L);
            BatchDrainedResult result = this.completeOrSplitBatches(accum, 1024);
            Assertions.assertTrue(((double)(numSplit += result.numSplit) / (double)(numBatches += result.numBatches) < (double)0.1f ? 1 : 0) != 0, (String)String.format("Total num batches = %d, split batches = %d, more than 10%% of the batch splits. Random seed is " + seed, numBatches, numSplit));
        }
    }

    @Test
    public void testSoonToExpireBatchesArePickedUpForExpiry() throws InterruptedException {
        int lingerMs = 500;
        int batchSize = 1025;
        RecordAccumulator accum = this.createTestRecordAccumulator(batchSize + 61, 10 * batchSize, CompressionType.NONE, lingerMs);
        accum.append(this.tp1, 0L, this.key, this.value, Record.EMPTY_HEADERS, null, 1000L, false, this.time.milliseconds());
        Set readyNodes = accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes;
        Map drained = accum.drain(this.cluster, readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assertions.assertTrue((boolean)drained.isEmpty());
        this.time.sleep(lingerMs + 1);
        readyNodes = accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes;
        drained = accum.drain(this.cluster, readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)drained.size(), (String)"A batch did not drain after linger");
        accum.append(this.tp2, 0L, this.key, this.value, Record.EMPTY_HEADERS, null, 1000L, false, this.time.milliseconds());
        this.time.sleep(lingerMs * 4);
        readyNodes = accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes;
        drained = accum.drain(this.cluster, readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)drained.size(), (String)"A batch did not drain after linger");
    }

    @Test
    public void testExpiredBatchesRetry() throws InterruptedException {
        int lingerMs = 3000;
        int rtt = 1000;
        int deliveryTimeoutMs = 3200;
        List<Boolean> muteStates = Arrays.asList(false, true);
        int batchSize = 1025;
        RecordAccumulator accum = this.createTestRecordAccumulator(batchSize + 61, 10 * batchSize, CompressionType.NONE, lingerMs);
        for (Boolean mute : muteStates) {
            accum.append(this.tp1, 0L, this.key, this.value, Record.EMPTY_HEADERS, null, 0L, false, this.time.milliseconds());
            this.time.sleep(lingerMs);
            Set readyNodes = accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes;
            Assertions.assertEquals(Collections.singleton(this.node1), (Object)readyNodes, (String)"Our partition's leader should be ready");
            Map drained = accum.drain(this.cluster, readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
            Assertions.assertEquals((int)1, (int)((List)drained.get(this.node1.id())).size(), (String)"There should be only one batch.");
            this.time.sleep(rtt);
            accum.reenqueue((ProducerBatch)((List)drained.get(this.node1.id())).get(0), this.time.milliseconds());
            if (mute.booleanValue()) {
                accum.mutePartition(this.tp1);
            } else {
                accum.unmutePartition(this.tp1);
            }
            this.time.sleep(deliveryTimeoutMs - rtt);
            accum.drain(this.cluster, Collections.singleton(this.node1), Integer.MAX_VALUE, this.time.milliseconds());
            List expiredBatches = accum.expiredBatches(this.time.milliseconds());
            Assertions.assertEquals((int)(mute != false ? 1 : 0), (int)expiredBatches.size(), (String)"RecordAccumulator has expired batches if the partition is not muted");
        }
    }

    @Test
    public void testStickyBatches() throws Exception {
        int numBatches;
        Deque partitionBatches3;
        Deque partitionBatches2;
        Deque partitionBatches1;
        RecordAccumulator.RecordAppendResult result;
        long now = this.time.milliseconds();
        int batchSize = 1025;
        DefaultPartitioner partitioner = new DefaultPartitioner();
        RecordAccumulator accum = this.createTestRecordAccumulator(3200, batchSize + 61, 10L * (long)batchSize, CompressionType.NONE, 10);
        int expectedAppends = this.expectedNumAppendsNoKey(batchSize);
        int partition = partitioner.partition(this.topic, null, null, (Object)"value", this.value, this.cluster);
        TopicPartition tp = new TopicPartition(this.topic, partition);
        accum.append(tp, 0L, null, this.value, Record.EMPTY_HEADERS, null, 1000L, false, this.time.milliseconds());
        int appends = 1;
        boolean switchPartition = false;
        while (!switchPartition) {
            partition = partitioner.partition(this.topic, null, null, (Object)"value", this.value, this.cluster);
            tp = new TopicPartition(this.topic, partition);
            result = accum.append(tp, 0L, null, this.value, Record.EMPTY_HEADERS, null, 1000L, true, this.time.milliseconds());
            partitionBatches1 = (Deque)accum.batches().get(this.tp1);
            partitionBatches2 = (Deque)accum.batches().get(this.tp2);
            partitionBatches3 = (Deque)accum.batches().get(this.tp3);
            numBatches = (partitionBatches1 == null ? 0 : partitionBatches1.size()) + (partitionBatches2 == null ? 0 : partitionBatches2.size()) + (partitionBatches3 == null ? 0 : partitionBatches3.size());
            Assertions.assertEquals((int)1, (int)numBatches);
            switchPartition = result.abortForNewBatch;
            if (switchPartition) continue;
            ++appends;
            Assertions.assertEquals((int)0, (int)accum.ready((Cluster)this.cluster, (long)now).readyNodes.size(), (String)"No partitions should be ready.");
        }
        Assertions.assertEquals((int)1, (int)accum.ready((Cluster)this.cluster, (long)this.time.milliseconds()).readyNodes.size());
        Assertions.assertEquals((int)appends, (int)expectedAppends);
        switchPartition = false;
        partitioner.onNewBatch(this.topic, this.cluster, partition);
        partition = partitioner.partition(this.topic, null, null, (Object)"value", this.value, this.cluster);
        tp = new TopicPartition(this.topic, partition);
        accum.append(tp, 0L, null, this.value, Record.EMPTY_HEADERS, null, 1000L, false, this.time.milliseconds());
        ++appends;
        while (!switchPartition) {
            partition = partitioner.partition(this.topic, null, null, (Object)"value", this.value, this.cluster);
            tp = new TopicPartition(this.topic, partition);
            result = accum.append(tp, 0L, null, this.value, Record.EMPTY_HEADERS, null, 1000L, true, this.time.milliseconds());
            partitionBatches1 = (Deque)accum.batches().get(this.tp1);
            partitionBatches2 = (Deque)accum.batches().get(this.tp2);
            partitionBatches3 = (Deque)accum.batches().get(this.tp3);
            numBatches = (partitionBatches1 == null ? 0 : partitionBatches1.size()) + (partitionBatches2 == null ? 0 : partitionBatches2.size()) + (partitionBatches3 == null ? 0 : partitionBatches3.size());
            Assertions.assertEquals((int)2, (int)numBatches);
            switchPartition = result.abortForNewBatch;
            if (switchPartition) continue;
            ++appends;
        }
        Assertions.assertEquals((int)appends, (int)(2 * expectedAppends));
    }

    private int prepareSplitBatches(RecordAccumulator accum, long seed, int recordSize, int numRecords) throws InterruptedException {
        Random random = new Random();
        random.setSeed(seed);
        CompressionRatioEstimator.setEstimation((String)this.tp1.topic(), (CompressionType)CompressionType.GZIP, (float)0.1f);
        for (int i = 0; i < numRecords; ++i) {
            accum.append(this.tp1, 0L, null, this.bytesWithPoorCompression(random, recordSize), Record.EMPTY_HEADERS, null, 0L, false, this.time.milliseconds());
        }
        RecordAccumulator.ReadyCheckResult result = accum.ready(this.cluster, this.time.milliseconds());
        Assertions.assertFalse((boolean)result.readyNodes.isEmpty());
        Map batches = accum.drain(this.cluster, result.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)batches.size());
        Assertions.assertEquals((int)1, (int)((List)batches.values().iterator().next()).size());
        ProducerBatch batch = (ProducerBatch)((List)batches.values().iterator().next()).get(0);
        int numSplitBatches = accum.splitAndReenqueue(batch);
        accum.deallocate(batch);
        return numSplitBatches;
    }

    private BatchDrainedResult completeOrSplitBatches(RecordAccumulator accum, int batchSize) {
        boolean batchDrained;
        int numSplit = 0;
        int numBatches = 0;
        do {
            batchDrained = false;
            RecordAccumulator.ReadyCheckResult result = accum.ready(this.cluster, this.time.milliseconds());
            Map batches = accum.drain(this.cluster, result.readyNodes, Integer.MAX_VALUE, this.time.milliseconds());
            for (List batchList : batches.values()) {
                for (ProducerBatch batch : batchList) {
                    batchDrained = true;
                    ++numBatches;
                    if (batch.estimatedSizeInBytes() > batchSize + 61) {
                        accum.splitAndReenqueue(batch);
                        ++numSplit;
                    } else {
                        batch.complete(0L, 0L);
                    }
                    accum.deallocate(batch);
                }
            }
        } while (batchDrained);
        return new BatchDrainedResult(numSplit, numBatches);
    }

    private byte[] bytesWithGoodCompression(Random random) {
        byte[] value = new byte[100];
        ByteBuffer buffer = ByteBuffer.wrap(value);
        while (buffer.remaining() > 0) {
            buffer.putInt(random.nextInt(1000));
        }
        return value;
    }

    private byte[] bytesWithPoorCompression(Random random, int size) {
        byte[] value = new byte[size];
        random.nextBytes(value);
        return value;
    }

    private int expectedNumAppends(int batchSize) {
        int size = 0;
        int offsetDelta = 0;
        int recordSize;
        while (size + (recordSize = DefaultRecord.sizeInBytes((int)offsetDelta, (long)0L, (int)this.key.length, (int)this.value.length, (Header[])Record.EMPTY_HEADERS)) <= batchSize) {
            ++offsetDelta;
            size += recordSize;
        }
        return offsetDelta;
    }

    private int expectedNumAppendsNoKey(int batchSize) {
        int size = 0;
        int offsetDelta = 0;
        int recordSize;
        while (size + (recordSize = DefaultRecord.sizeInBytes((int)offsetDelta, (long)0L, (int)0, (int)this.value.length, (Header[])Record.EMPTY_HEADERS)) <= batchSize) {
            ++offsetDelta;
            size += recordSize;
        }
        return offsetDelta;
    }

    private RecordAccumulator createTestRecordAccumulator(int batchSize, long totalSize, CompressionType type, int lingerMs) {
        int deliveryTimeoutMs = 3200;
        return this.createTestRecordAccumulator(deliveryTimeoutMs, batchSize, totalSize, type, lingerMs);
    }

    private RecordAccumulator createTestRecordAccumulator(int deliveryTimeoutMs, int batchSize, long totalSize, CompressionType type, int lingerMs) {
        return this.createTestRecordAccumulator(null, deliveryTimeoutMs, batchSize, totalSize, type, lingerMs);
    }

    private RecordAccumulator createTestRecordAccumulator(TransactionManager txnManager, int deliveryTimeoutMs, int batchSize, long totalSize, CompressionType type, int lingerMs) {
        long retryBackoffMs = 100L;
        String metricGrpName = "producer-metrics";
        return new RecordAccumulator(this.logContext, batchSize, type, lingerMs, retryBackoffMs, deliveryTimeoutMs, this.metrics, metricGrpName, (Time)this.time, new ApiVersions(), txnManager, new BufferPool(totalSize, batchSize, this.metrics, (Time)this.time, metricGrpName));
    }

    private class BatchDrainedResult {
        final int numSplit;
        final int numBatches;

        BatchDrainedResult(int numSplit, int numBatches) {
            this.numBatches = numBatches;
            this.numSplit = numSplit;
        }
    }
}

