package org.apache.zookeeper.test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.management.Attribute;
import javax.management.AttributeNotFoundException;
import javax.management.InstanceNotFoundException;
import javax.management.InvalidAttributeValueException;
import javax.management.MBeanException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.QueryExp;
import javax.management.ReflectionException;
import javax.management.RuntimeMBeanException;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.DummyWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.admin.ZooKeeperAdmin;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.jmx.ZKMBeanInfo;
import org.apache.zookeeper.metrics.MetricsUtils;
import org.apache.zookeeper.server.admin.Commands;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
import org.apache.zookeeper.server.util.PortForwarder;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/zookeeper/test/ObserverMasterTest.class */
public class ObserverMasterTest extends ObserverMasterTestBase {
    protected static final Logger LOG = LoggerFactory.getLogger(ObserverMasterTest.class);

    /* loaded from: input_file:org/apache/zookeeper/test/ObserverMasterTest$AsyncWriter.class */
    class AsyncWriter implements Runnable {
        private final ZooKeeper client;
        private final int numTransactions;
        private final boolean issueSync;
        private final CountDownLatch writerLatch;
        private final String root;
        private final CountDownLatch gate;

        AsyncWriter(ZooKeeper zooKeeper, int i, boolean z, CountDownLatch countDownLatch, String str, CountDownLatch countDownLatch2) {
            this.client = zooKeeper;
            this.numTransactions = i;
            this.issueSync = z;
            this.writerLatch = countDownLatch;
            this.root = str;
            this.gate = countDownLatch2;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.gate != null) {
                try {
                    this.gate.await();
                } catch (InterruptedException e) {
                    ObserverMasterTest.LOG.error("Gate interrupted");
                    return;
                }
            }
            for (int i = 0; i < this.numTransactions; i++) {
                boolean z = i % 100 == 0;
                this.client.create(this.root + i, "inner thread".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (i2, str, obj, str2) -> {
                    this.writerLatch.countDown();
                    if (z) {
                        ObserverMasterTest.LOG.info("wrote {}", str);
                    }
                }, (Object) null);
                if (z) {
                    ObserverMasterTest.LOG.info("async wrote {}{}", this.root, Integer.valueOf(i));
                    if (this.issueSync) {
                        this.client.sync(this.root + "0", (AsyncCallback.VoidCallback) null, (Object) null);
                    }
                }
            }
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testObserver(boolean z) throws Exception {
        this.latch = new CountDownLatch(2);
        setUp(-1, Boolean.valueOf(z));
        this.q3.start();
        Assertions.assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + this.CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT), "waiting for server 3 being up");
        validateObserverSyncTimeMetrics();
        if (z) {
            int port = this.q3.getQuorumPeer().observer.getSocket().getPort();
            LOG.info("port {} {}", Integer.valueOf(port), Integer.valueOf(this.OM_PORT));
            Assertions.assertEquals(port, this.OM_PORT, "observer failed to connect to observer master");
        }
        this.zk = new ZooKeeper("127.0.0.1:" + this.CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT, this);
        this.zk.create("/obstest", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Assertions.assertEquals(new String(this.zk.getData("/obstest", (Watcher) null, (Stat) null)), "test");
        this.zk.sync("/", (AsyncCallback.VoidCallback) null, (Object) null);
        this.zk.setData("/obstest", "test2".getBytes(), -1);
        this.zk.getChildren("/", false);
        Assertions.assertEquals(this.zk.getState(), ZooKeeper.States.CONNECTED);
        LOG.info("Shutting down server 2");
        this.q2.shutdown();
        Assertions.assertTrue(ClientBase.waitForServerDown("127.0.0.1:" + this.CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT), "Waiting for server 2 to shut down");
        LOG.info("Server 2 down");
        this.latch.await();
        Assertions.assertNotSame(Watcher.Event.KeeperState.SyncConnected, this.lastEvent.getState(), "Client is still connected to non-quorate cluster");
        LOG.info("Latch returned");
        try {
            Assertions.assertNotEquals("Shouldn't get a response when cluster not quorate!", "test", new String(this.zk.getData("/obstest", (Watcher) null, (Stat) null)));
        } catch (KeeperException.ConnectionLossException e) {
            LOG.info("Connection loss exception caught - ensemble not quorate (this is expected)");
        }
        this.latch = new CountDownLatch(1);
        LOG.info("Restarting server 2");
        this.q2.start();
        LOG.info("Waiting for server 2 to come up");
        Assertions.assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + this.CLIENT_PORT_QP2, ClientBase.CONNECTION_TIMEOUT), "waiting for server 2 being up");
        LOG.info("Server 2 started, waiting for latch");
        this.latch.await();
        Assertions.assertTrue(Watcher.Event.KeeperState.SyncConnected == this.lastEvent.getState() || Watcher.Event.KeeperState.Expired == this.lastEvent.getState(), "Client didn't reconnect to quorate ensemble (state was" + this.lastEvent.getState() + ")");
        LOG.info("perform a revalidation test");
        int unique = PortAssignment.unique();
        int unique2 = PortAssignment.unique();
        PortForwarder portForwarder = new PortForwarder(unique, this.q1.getQuorumPeer().leader == null ? this.CLIENT_PORT_QP2 : this.CLIENT_PORT_QP1);
        this.latch = new CountDownLatch(1);
        ZooKeeper zooKeeper = new ZooKeeper(String.format("127.0.0.1:%d,127.0.0.1:%d", Integer.valueOf(unique), Integer.valueOf(unique2)), ClientBase.CONNECTION_TIMEOUT, this);
        this.latch.await();
        zooKeeper.create("/revalidtest", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        Assertions.assertNotNull(zooKeeper.exists("/revalidtest", (Watcher) null), "Read-after write failed");
        this.latch = new CountDownLatch(2);
        PortForwarder portForwarder2 = new PortForwarder(unique2, this.CLIENT_PORT_OBS);
        try {
            portForwarder.shutdown();
        } catch (Exception e2) {
        }
        this.latch.await();
        Assertions.assertEquals(new String(zooKeeper.getData("/revalidtest", (Watcher) null, (Stat) null)), "test");
        zooKeeper.close();
        portForwarder2.shutdown();
        shutdown();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testRevalidation(boolean z) throws Exception {
        setUp(-1, Boolean.valueOf(z));
        this.q3.start();
        Assertions.assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + this.CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT), "waiting for server 3 being up");
        int unique = PortAssignment.unique();
        int unique2 = PortAssignment.unique();
        PortForwarder portForwarder = new PortForwarder(unique, this.q1.getQuorumPeer().leader == null ? this.CLIENT_PORT_QP2 : this.CLIENT_PORT_QP1);
        this.latch = new CountDownLatch(1);
        this.zk = new ZooKeeper(String.format("127.0.0.1:%d,127.0.0.1:%d", Integer.valueOf(unique), Integer.valueOf(unique2)), ClientBase.CONNECTION_TIMEOUT, this);
        this.latch.await();
        this.zk.create("/revalidtest", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        Assertions.assertNotNull(this.zk.exists("/revalidtest", (Watcher) null), "Read-after write failed");
        this.latch = new CountDownLatch(2);
        PortForwarder portForwarder2 = new PortForwarder(unique2, this.CLIENT_PORT_OBS);
        try {
            portForwarder.shutdown();
        } catch (Exception e) {
        }
        this.latch.await();
        Assertions.assertEquals(new String(this.zk.getData("/revalidtest", (Watcher) null, (Stat) null)), "test");
        portForwarder2.shutdown();
        shutdown();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testInOrderCommits(boolean z) throws Exception {
        setUp(-1, Boolean.valueOf(z));
        this.zk = new ZooKeeper("127.0.0.1:" + this.CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT, (Watcher) null);
        for (int i = 0; i < 10; i++) {
            this.zk.create("/bulk" + i, "Initial data of some size".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        this.zk.close();
        this.q3.start();
        Assertions.assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + this.CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT), "waiting for observer to be up");
        this.latch = new CountDownLatch(1);
        this.zk = new ZooKeeper("127.0.0.1:" + this.CLIENT_PORT_QP1, ClientBase.CONNECTION_TIMEOUT, this);
        this.latch.await();
        Assertions.assertEquals(this.zk.getState(), ZooKeeper.States.CONNECTED);
        this.zk.create("/init", "first".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        final long lastLoggedZxid = this.q1.getQuorumPeer().getLastLoggedZxid();
        waitFor("Timeout waiting for observer sync", new ZKTestCase.WaitForCondition() { // from class: org.apache.zookeeper.test.ObserverMasterTest.1
            @Override // org.apache.zookeeper.ZKTestCase.WaitForCondition
            public boolean evaluate() {
                return lastLoggedZxid == ObserverMasterTest.this.q3.getQuorumPeer().getLastLoggedZxid();
            }
        }, 30);
        ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:" + this.CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT, this);
        ZooKeeper zooKeeper2 = new ZooKeeper("127.0.0.1:" + (this.q1.getQuorumPeer().leader == null ? this.CLIENT_PORT_QP1 : this.CLIENT_PORT_QP2), ClientBase.CONNECTION_TIMEOUT, this);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(10001);
        Thread thread = new Thread(new AsyncWriter(zooKeeper, 10001, true, countDownLatch2, "/obs", countDownLatch));
        CountDownLatch countDownLatch3 = new CountDownLatch(10001);
        Thread thread2 = new Thread(new AsyncWriter(zooKeeper2, 10001, true, countDownLatch3, "/follower", countDownLatch));
        LOG.info("ASYNC WRITES");
        thread.start();
        thread2.start();
        countDownLatch.countDown();
        countDownLatch2.await();
        countDownLatch3.await();
        thread.join(ClientBase.CONNECTION_TIMEOUT);
        if (thread.isAlive()) {
            LOG.error("asyncWriteThread is still alive");
        }
        thread2.join(ClientBase.CONNECTION_TIMEOUT);
        if (thread2.isAlive()) {
            LOG.error("asyncWriteThread is still alive");
        }
        zooKeeper.close();
        zooKeeper2.close();
        shutdown();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testAdminCommands(boolean z) throws IOException, MBeanException, InstanceNotFoundException, ReflectionException, InterruptedException, MalformedObjectNameException, AttributeNotFoundException, InvalidAttributeValueException, KeeperException {
        Iterator it = MBeanRegistry.getInstance().getRegisteredBeans().iterator();
        while (it.hasNext()) {
            MBeanRegistry.getInstance().unregister((ZKMBeanInfo) it.next());
        }
        JMXEnv.setUp();
        setUp(-1, Boolean.valueOf(z));
        this.q3.start();
        Assertions.assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + this.CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT), "waiting for observer to be up");
        this.zk = new ZooKeeper("127.0.0.1:" + this.CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT, this);
        this.zk.create("/obstest", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Assertions.assertEquals(new String(this.zk.getData("/obstest", (Watcher) null, (Stat) null)), "test");
        Assertions.assertTrue(Commands.runCommand("mntr", this.q3.getQuorumPeer().getActiveServer(), Collections.emptyMap()).toMap().containsKey("observer_master_id"), "observer not emitting observer_master_id");
        if (z) {
            if (this.q1.getQuorumPeer().leader == null) {
                Assertions.assertEquals(1, this.q1.getQuorumPeer().getSynced_observers_metric());
            } else {
                Assertions.assertEquals(0, this.q1.getQuorumPeer().getSynced_observers_metric());
            }
        } else if (this.q1.getQuorumPeer().leader == null) {
            Assertions.assertNull(this.q1.getQuorumPeer().getSynced_observers_metric());
        } else {
            Assertions.assertEquals(1, this.q1.getQuorumPeer().getSynced_observers_metric());
        }
        if (z) {
            if (this.q2.getQuorumPeer().leader == null) {
                Assertions.assertEquals(1, this.q2.getQuorumPeer().getSynced_observers_metric());
            } else {
                Assertions.assertEquals(0, this.q2.getQuorumPeer().getSynced_observers_metric());
            }
        } else if (this.q2.getQuorumPeer().leader == null) {
            Assertions.assertNull(this.q2.getQuorumPeer().getSynced_observers_metric());
        } else {
            Assertions.assertEquals(1, this.q2.getQuorumPeer().getSynced_observers_metric());
        }
        ObjectName objectName = null;
        Iterator it2 = JMXEnv.conn().queryNames(new ObjectName("org.apache.ZooKeeperService:*"), (QueryExp) null).iterator();
        while (true) {
            if (!it2.hasNext()) {
                break;
            }
            ObjectName objectName2 = (ObjectName) it2.next();
            if (objectName2.getCanonicalName().contains("Learner_Connections") && objectName2.getCanonicalName().contains("id:" + this.q3.getQuorumPeer().getId())) {
                objectName = objectName2;
                break;
            }
        }
        Assertions.assertNotNull(objectName, "could not find connection bean");
        this.latch = new CountDownLatch(1);
        JMXEnv.conn().invoke(objectName, "terminateConnection", new Object[0], (String[]) null);
        Assertions.assertTrue(this.latch.await(ClientBase.CONNECTION_TIMEOUT / 2, TimeUnit.MILLISECONDS), "server failed to disconnect on terminate");
        Assertions.assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + this.CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT), "waiting for server 3 being up");
        Set queryNames = JMXEnv.conn().queryNames(new ObjectName(String.format("org.apache.ZooKeeperService:name0=ReplicatedServer_id%d,name1=replica.%d,name2=Observer", Long.valueOf(this.q3.getQuorumPeer().getId()), Long.valueOf(this.q3.getQuorumPeer().getId()))), (QueryExp) null);
        Assertions.assertEquals(1, queryNames.size(), "expecting singular observer bean");
        ObjectName objectName3 = (ObjectName) queryNames.iterator().next();
        if (z) {
            long learnerMasterId = this.q3.getQuorumPeer().observer.getLearnerMasterId();
            this.latch = new CountDownLatch(1);
            JMXEnv.conn().setAttribute(objectName3, new Attribute("LearnerMaster", Long.toString(3 - learnerMasterId)));
            Assertions.assertTrue(this.latch.await(ClientBase.CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "server failed to disconnect on terminate");
            Assertions.assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + this.CLIENT_PORT_OBS, ClientBase.CONNECTION_TIMEOUT), "waiting for server 3 being up");
        } else {
            try {
                JMXEnv.conn().setAttribute(objectName3, new Attribute("LearnerMaster", Long.toString(3 - (this.q1.getQuorumPeer().leader == null ? 2L : 1L))));
                Assertions.fail("should have seen an exception on previous command");
            } catch (RuntimeMBeanException e) {
                Assertions.assertEquals(IllegalArgumentException.class, e.getCause().getClass(), "mbean failed for the wrong reason");
            }
        }
        shutdown();
        JMXEnv.tearDown();
    }

    private String createServerString(String str, long j, int i) {
        return "server." + j + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ":" + str + ";" + i;
    }

    private void waitServerUp(int i) {
        Assertions.assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + i, ClientBase.CONNECTION_TIMEOUT), "waiting for server being up");
    }

    private ZooKeeperAdmin createAdmin(int i) throws IOException {
        System.setProperty("zookeeper.DigestAuthenticationProvider.superDigest", "super:D/InIHSb7yEEbrWz8b9l71RjZJU=");
        QuorumPeerConfig.setReconfigEnabled(true);
        ZooKeeperAdmin zooKeeperAdmin = new ZooKeeperAdmin("127.0.0.1:" + i, ClientBase.CONNECTION_TIMEOUT, DummyWatcher.INSTANCE);
        zooKeeperAdmin.addAuthInfo("digest", "super:test".getBytes());
        return zooKeeperAdmin;
    }

    @Disabled
    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testDynamicReconfig(boolean z) throws InterruptedException, IOException, KeeperException {
        if (z) {
            ClientBase.setupTestEnv();
            int unique = PortAssignment.unique();
            int unique2 = PortAssignment.unique();
            int unique3 = PortAssignment.unique();
            int unique4 = PortAssignment.unique();
            String str = createServerString("participant", 1L, unique) + "\n" + createServerString("participant", 2L, unique2);
            QuorumPeerTestBase.MainThread mainThread = new QuorumPeerTestBase.MainThread(1, unique, str, String.format("observerMasterPort=%d%n", Integer.valueOf(unique3)));
            QuorumPeerTestBase.MainThread mainThread2 = new QuorumPeerTestBase.MainThread(2, unique2, str, String.format("observerMasterPort=%d%n", Integer.valueOf(unique4)));
            mainThread.start();
            mainThread2.start();
            waitServerUp(unique);
            waitServerUp(unique2);
            long j = mainThread.getQuorumPeer().leader == null ? unique3 : unique4;
            int unique5 = PortAssignment.unique();
            QuorumPeerTestBase.MainThread mainThread3 = new QuorumPeerTestBase.MainThread(10, unique5, str + "\n" + createServerString("observer", 10, unique5), String.format("observerMasterPort=%d%n", Long.valueOf(j)));
            LOG.info("starting observer");
            mainThread3.start();
            waitServerUp(unique5);
            LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
            ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:" + unique5, ClientBase.CONNECTION_TIMEOUT, watchedEvent -> {
                try {
                    linkedBlockingQueue.put(watchedEvent.getState());
                } catch (InterruptedException e) {
                }
            });
            Assertions.assertEquals(Watcher.Event.KeeperState.SyncConnected, (Watcher.Event.KeeperState) linkedBlockingQueue.poll(1000L, TimeUnit.MILLISECONDS));
            ArrayList arrayList = new ArrayList();
            arrayList.add("server.3=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + ":participant;localhost:" + PortAssignment.unique());
            ZooKeeperAdmin createAdmin = createAdmin(unique);
            ReconfigTest.reconfig(createAdmin, arrayList, null, null, -1L);
            ReconfigTest.testServerHasConfig(zooKeeper, arrayList, null);
            Assertions.assertNull((Watcher.Event.KeeperState) linkedBlockingQueue.poll(1000L, TimeUnit.MILLISECONDS));
            createAdmin.close();
            zooKeeper.close();
            mainThread3.shutdown();
            mainThread2.shutdown();
            mainThread.shutdown();
        }
    }

    private void validateObserverSyncTimeMetrics() {
        Map<String, Object> currentServerMetrics = MetricsUtils.currentServerMetrics();
        Assertions.assertEquals(5L, currentServerMetrics.keySet().stream().filter(str -> {
            return str.contains("observer_sync_time");
        }).count());
        Assertions.assertNotNull(currentServerMetrics.get(String.format("avg_%s", "observer_sync_time")));
        Assertions.assertNotNull(currentServerMetrics.get(String.format("min_%s", "observer_sync_time")));
        Assertions.assertNotNull(currentServerMetrics.get(String.format("max_%s", "observer_sync_time")));
        Assertions.assertNotNull(currentServerMetrics.get(String.format("cnt_%s", "observer_sync_time")));
        Assertions.assertNotNull(currentServerMetrics.get(String.format("sum_%s", "observer_sync_time")));
    }
}
