/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.wal.WAL;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={MediumTests.class})
@Ignore(value="Flaky, needs to be rewritten, see HBASE-19125")
public class TestReplicator
extends TestReplicationBase {
    static final Log LOG = LogFactory.getLog(TestReplicator.class);
    static final int NUM_ROWS = 10;

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        conf1.setInt("hbase.ipc.max.request.size", 10240);
        TestReplicationBase.setUpBeforeClass();
        admin.removePeer("2");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReplicatorBatching() throws Exception {
        this.truncateTable(utility1, tableName);
        this.truncateTable(utility2, tableName);
        admin.addPeer("testReplicatorBatching", new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey()).setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
        ReplicationEndpointForTest.setBatchCount(0);
        ReplicationEndpointForTest.setEntriesCount(0);
        try {
            ReplicationEndpointForTest.pause();
            try {
                byte[] valueBytes = new byte[8192];
                for (int i = 0; i < 10; ++i) {
                    htable1.put(new Put(("row" + Integer.toString(i)).getBytes()).addColumn(famName, null, valueBytes));
                }
            }
            finally {
                ReplicationEndpointForTest.resume();
            }
            Waiter.waitFor((Configuration)conf1, (long)60000L, (Waiter.Predicate)new Waiter.ExplainingPredicate<Exception>(){

                public boolean evaluate() throws Exception {
                    return ReplicationEndpointForTest.getBatchCount() >= 10;
                }

                public String explainFailure() throws Exception {
                    return "We waited too long for expected replication of 10 entries";
                }
            });
            Assert.assertEquals((String)"We sent an incorrect number of batches", (long)10L, (long)ReplicationEndpointForTest.getBatchCount());
            Assert.assertEquals((String)"We did not replicate enough rows", (long)10L, (long)utility2.countRows(htable2));
        }
        finally {
            admin.removePeer("testReplicatorBatching");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReplicatorWithErrors() throws Exception {
        this.truncateTable(utility1, tableName);
        this.truncateTable(utility2, tableName);
        admin.addPeer("testReplicatorWithErrors", new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey()).setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()), null);
        FailureInjectingReplicationEndpointForTest.setBatchCount(0);
        FailureInjectingReplicationEndpointForTest.setEntriesCount(0);
        try {
            FailureInjectingReplicationEndpointForTest.pause();
            try {
                byte[] valueBytes = new byte[8192];
                for (int i = 0; i < 10; ++i) {
                    htable1.put(new Put(("row" + Integer.toString(i)).getBytes()).addColumn(famName, null, valueBytes));
                }
            }
            finally {
                FailureInjectingReplicationEndpointForTest.resume();
            }
            Waiter.waitFor((Configuration)conf1, (long)60000L, (Waiter.Predicate)new Waiter.ExplainingPredicate<Exception>(){

                public boolean evaluate() throws Exception {
                    return FailureInjectingReplicationEndpointForTest.getEntriesCount() >= 10;
                }

                public String explainFailure() throws Exception {
                    return "We waited too long for expected replication of 10 entries";
                }
            });
            Assert.assertEquals((String)"We did not replicate enough rows", (long)10L, (long)utility2.countRows(htable2));
        }
        finally {
            admin.removePeer("testReplicatorWithErrors");
        }
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        TestReplicationBase.tearDownAfterClass();
    }

    private void truncateTable(HBaseTestingUtility util, TableName tablename) throws IOException {
        HBaseAdmin admin = util.getHBaseAdmin();
        admin.disableTable(tableName);
        admin.truncateTable(tablename, false);
    }

    public static class FailureInjectingReplicationEndpointForTest
    extends ReplicationEndpointForTest {
        @Override
        protected HBaseInterClusterReplicationEndpoint.Replicator createReplicator(List<WAL.Entry> entries, int ordinal) {
            return new FailureInjectingReplicatorForTest(entries, ordinal);
        }

        public class FailureInjectingReplicatorForTest
        extends ReplicationEndpointForTest.ReplicatorForTest {
            public FailureInjectingReplicatorForTest(List<WAL.Entry> entries, int ordinal) {
                super(entries, ordinal);
            }

            @Override
            protected void replicateEntries(AdminProtos.AdminService.BlockingInterface rrs, List<WAL.Entry> entries, String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir) throws IOException {
                super.replicateEntries(new FailureInjectingBlockingInterface(rrs), entries, replicationClusterId, baseNamespaceDir, hfileArchiveDir);
            }
        }

        static class FailureInjectingBlockingInterface
        implements AdminProtos.AdminService.BlockingInterface {
            private final AdminProtos.AdminService.BlockingInterface delegate;
            private volatile boolean failNext;

            public FailureInjectingBlockingInterface(AdminProtos.AdminService.BlockingInterface delegate) {
                this.delegate = delegate;
            }

            public AdminProtos.GetRegionInfoResponse getRegionInfo(RpcController controller, AdminProtos.GetRegionInfoRequest request) throws ServiceException {
                return this.delegate.getRegionInfo(controller, request);
            }

            public AdminProtos.GetStoreFileResponse getStoreFile(RpcController controller, AdminProtos.GetStoreFileRequest request) throws ServiceException {
                return this.delegate.getStoreFile(controller, request);
            }

            public AdminProtos.GetOnlineRegionResponse getOnlineRegion(RpcController controller, AdminProtos.GetOnlineRegionRequest request) throws ServiceException {
                return this.delegate.getOnlineRegion(controller, request);
            }

            public AdminProtos.OpenRegionResponse openRegion(RpcController controller, AdminProtos.OpenRegionRequest request) throws ServiceException {
                return this.delegate.openRegion(controller, request);
            }

            public AdminProtos.WarmupRegionResponse warmupRegion(RpcController controller, AdminProtos.WarmupRegionRequest request) throws ServiceException {
                return this.delegate.warmupRegion(controller, request);
            }

            public AdminProtos.CloseRegionResponse closeRegion(RpcController controller, AdminProtos.CloseRegionRequest request) throws ServiceException {
                return this.delegate.closeRegion(controller, request);
            }

            public AdminProtos.FlushRegionResponse flushRegion(RpcController controller, AdminProtos.FlushRegionRequest request) throws ServiceException {
                return this.delegate.flushRegion(controller, request);
            }

            public AdminProtos.CompactRegionResponse compactRegion(RpcController controller, AdminProtos.CompactRegionRequest request) throws ServiceException {
                return this.delegate.compactRegion(controller, request);
            }

            public AdminProtos.ReplicateWALEntryResponse replicateWALEntry(RpcController controller, AdminProtos.ReplicateWALEntryRequest request) throws ServiceException {
                if (!this.failNext) {
                    this.failNext = true;
                    return this.delegate.replicateWALEntry(controller, request);
                }
                this.failNext = false;
                throw new ServiceException("Injected failure");
            }

            public AdminProtos.ReplicateWALEntryResponse replay(RpcController controller, AdminProtos.ReplicateWALEntryRequest request) throws ServiceException {
                return this.delegate.replay(controller, request);
            }

            public AdminProtos.RollWALWriterResponse rollWALWriter(RpcController controller, AdminProtos.RollWALWriterRequest request) throws ServiceException {
                return this.delegate.rollWALWriter(controller, request);
            }

            public AdminProtos.GetServerInfoResponse getServerInfo(RpcController controller, AdminProtos.GetServerInfoRequest request) throws ServiceException {
                return this.delegate.getServerInfo(controller, request);
            }

            public AdminProtos.StopServerResponse stopServer(RpcController controller, AdminProtos.StopServerRequest request) throws ServiceException {
                return this.delegate.stopServer(controller, request);
            }

            public AdminProtos.UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, AdminProtos.UpdateFavoredNodesRequest request) throws ServiceException {
                return this.delegate.updateFavoredNodes(controller, request);
            }

            public AdminProtos.UpdateConfigurationResponse updateConfiguration(RpcController controller, AdminProtos.UpdateConfigurationRequest request) throws ServiceException {
                return this.delegate.updateConfiguration(controller, request);
            }

            public AdminProtos.GetRegionLoadResponse getRegionLoad(RpcController controller, AdminProtos.GetRegionLoadRequest request) throws ServiceException {
                return this.delegate.getRegionLoad(controller, request);
            }

            public AdminProtos.ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller, AdminProtos.ClearCompactionQueuesRequest request) throws ServiceException {
                return this.delegate.clearCompactionQueues(controller, request);
            }

            public QuotaProtos.GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller, QuotaProtos.GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
                return this.delegate.getSpaceQuotaSnapshots(controller, request);
            }

            public AdminProtos.ExecuteProceduresResponse executeProcedures(RpcController controller, AdminProtos.ExecuteProceduresRequest request) throws ServiceException {
                return null;
            }
        }
    }

    public static class ReplicationEndpointForTest
    extends HBaseInterClusterReplicationEndpoint {
        private static int batchCount;
        private static int entriesCount;
        private static final Object latch;
        private static AtomicBoolean useLatch;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public static void resume() {
            useLatch.set(false);
            Object object = latch;
            synchronized (object) {
                latch.notifyAll();
            }
        }

        public static void pause() {
            useLatch.set(true);
        }

        public static void await() throws InterruptedException {
            if (useLatch.get()) {
                LOG.info((Object)"Waiting on latch");
                latch.wait();
                LOG.info((Object)"Waited on latch, now proceeding");
            }
        }

        public static int getBatchCount() {
            return batchCount;
        }

        public static void setBatchCount(int i) {
            batchCount = i;
        }

        public static int getEntriesCount() {
            return entriesCount;
        }

        public static void setEntriesCount(int i) {
            entriesCount = i;
        }

        public boolean replicate(ReplicationEndpoint.ReplicateContext replicateContext) {
            try {
                ReplicationEndpointForTest.await();
            }
            catch (InterruptedException e) {
                LOG.warn((Object)"Interrupted waiting for latch", (Throwable)e);
            }
            return super.replicate(replicateContext);
        }

        protected HBaseInterClusterReplicationEndpoint.Replicator createReplicator(List<WAL.Entry> entries, int ordinal) {
            return new ReplicatorForTest(entries, ordinal);
        }

        static {
            latch = new Object();
            useLatch = new AtomicBoolean(false);
        }

        public class ReplicatorForTest
        extends HBaseInterClusterReplicationEndpoint.Replicator {
            public ReplicatorForTest(List<WAL.Entry> entries, int ordinal) {
                super((HBaseInterClusterReplicationEndpoint)ReplicationEndpointForTest.this, entries, ordinal);
            }

            protected void replicateEntries(AdminProtos.AdminService.BlockingInterface rrs, List<WAL.Entry> entries, String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir) throws IOException {
                try {
                    long size = 0L;
                    for (WAL.Entry e : entries) {
                        size += e.getKey().estimatedSerializedSizeOf();
                        size += e.getEdit().estimatedSerializedSizeOf();
                    }
                    LOG.info((Object)("Replicating batch " + System.identityHashCode(entries) + " of " + entries.size() + " entries with total size " + size + " bytes to " + replicationClusterId));
                    super.replicateEntries(rrs, entries, replicationClusterId, baseNamespaceDir, hfileArchiveDir);
                    entriesCount = entriesCount + entries.size();
                    batchCount++;
                    LOG.info((Object)("Completed replicating batch " + System.identityHashCode(entries)));
                }
                catch (IOException e) {
                    LOG.info((Object)("Failed to replicate batch " + System.identityHashCode(entries)), (Throwable)e);
                    throw e;
                }
            }
        }
    }
}

