diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java index 09de3ed5c701e..f4c82a643605a 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/MetadataStoreExtended.java @@ -21,6 +21,7 @@ import java.util.EnumSet; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreConfig; @@ -71,4 +72,12 @@ public static MetadataStoreExtended create(String metadataURL, MetadataStoreConf */ CompletableFuture put(String path, byte[] value, Optional expectedVersion, EnumSet options); + + /** + * Register a session listener that will get notified of changes in status of the current session. + * + * @param listener + * the session listener + */ + void registerSessionListener(Consumer listener); } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/SessionEvent.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/SessionEvent.java new file mode 100644 index 0000000000000..360fbe8710927 --- /dev/null +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/extended/SessionEvent.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.metadata.api.extended; + +/** + * An event regarding a session of MetadataStore + */ +public enum SessionEvent { + + /** + * The client is temporarily disconnected, although the session is still valid + */ + ConnectionLost, + + /** + * The client was able to successfully reconnect + */ + Reconnected, + + /** + * The session was lost, all the ephemeral keys created on the store within the current session might have been + * already expired. + */ + SessionLost, + + /** + * The session was established + */ + SessionReestablished, +} diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index a7ff703ad6947..f11ec69b2bfe3 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -46,6 +46,7 @@ import org.apache.pulsar.metadata.api.Stat; import org.apache.pulsar.metadata.api.extended.CreateOption; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.apache.pulsar.metadata.api.extended.SessionEvent; import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl; @Slf4j @@ -54,6 +55,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co private static final long CACHE_REFRESH_TIME_MILLIS = TimeUnit.MINUTES.toMillis(5); private final CopyOnWriteArrayList> listeners = new CopyOnWriteArrayList<>(); + private final CopyOnWriteArrayList> sessionListeners = new CopyOnWriteArrayList<>(); protected final ExecutorService executor; private final AsyncLoadingCache> childrenCache; private final AsyncLoadingCache existsCache; @@ -206,6 +208,21 @@ public final CompletableFuture put(String path, byte[] data, Optional listener) { + sessionListeners.add(listener); + } + + protected void receivedSessionEvent(SessionEvent event) { + sessionListeners.forEach(l -> { + try { + l.accept(event); + } catch (Throwable t) { + log.warn("Error in processing session event", t); + } + }); + } + @Override public void close() throws Exception { executor.shutdownNow(); diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java index 5352dfba93578..8c6fcd9e3845e 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java @@ -54,6 +54,7 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt private final boolean isZkManaged; private final ZooKeeper zkc; + private ZKSessionWatcher sessionWatcher; public ZKMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws MetadataStoreException { try { @@ -61,7 +62,14 @@ public ZKMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConf zkc = ZooKeeperClient.newBuilder().connectString(metadataURL) .connectRetryPolicy(new BoundExponentialBackoffRetryPolicy(100, 60_000, Integer.MAX_VALUE)) .allowReadOnlyMode(metadataStoreConfig.isAllowReadOnlyOperations()) - .sessionTimeoutMs(metadataStoreConfig.getSessionTimeoutMillis()).build(); + .sessionTimeoutMs(metadataStoreConfig.getSessionTimeoutMillis()) + .watchers(Collections.singleton(event -> { + if (sessionWatcher != null) { + sessionWatcher.process(event); + } + })) + .build(); + sessionWatcher = new ZKSessionWatcher(zkc, this::receivedSessionEvent); } catch (Throwable t) { throw new MetadataStoreException(t); } @@ -71,6 +79,7 @@ public ZKMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConf public ZKMetadataStore(ZooKeeper zkc) { this.isZkManaged = false; this.zkc = zkc; + this.sessionWatcher = new ZKSessionWatcher(zkc, this::receivedSessionEvent); } @Override @@ -271,6 +280,7 @@ public void close() throws Exception { if (isZkManaged) { zkc.close(); } + sessionWatcher.close(); super.close(); } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java new file mode 100644 index 0000000000000..d62e7857176ed --- /dev/null +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.metadata.impl; + +import io.netty.util.concurrent.DefaultThreadFactory; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.pulsar.metadata.api.extended.SessionEvent; +import org.apache.zookeeper.AsyncCallback.StatCallback; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; + +/** + * Monitor the ZK session state every few seconds and send notifications + */ +@Slf4j +public class ZKSessionWatcher implements AutoCloseable, Watcher { + private final ZooKeeper zk; + + private SessionEvent currentStatus; + private final Consumer sessionListener; + + // Maximum time to wait for ZK session to be re-connected to quorum (set to 5/6 of SessionTimeout) + private final long monitorTimeoutMillis; + + // Interval at which we check the state of the zk session (set to 1/15 of SessionTimeout) + private final long tickTimeMillis; + + private final ScheduledExecutorService scheduler; + private final ScheduledFuture task; + + private long disconnectedAt = 0; + + public ZKSessionWatcher(ZooKeeper zk, Consumer sessionListener) { + this.zk = zk; + this.monitorTimeoutMillis = zk.getSessionTimeout() * 5 / 6; + this.tickTimeMillis = zk.getSessionTimeout() / 15; + this.sessionListener = sessionListener; + + this.scheduler = Executors + .newSingleThreadScheduledExecutor(new DefaultThreadFactory("metadata-store-zk-session-watcher")); + this.task = scheduler.scheduleAtFixedRate(this::checkConnectionStatus, tickTimeMillis, tickTimeMillis, + TimeUnit.MILLISECONDS); + this.currentStatus = SessionEvent.SessionReestablished; + } + + @Override + public void close() throws Exception { + task.cancel(true); + scheduler.shutdownNow(); + scheduler.awaitTermination(10, TimeUnit.SECONDS); + } + + // task that runs every TICK_TIME to check zk connection + private synchronized void checkConnectionStatus() { + try { + CompletableFuture future = new CompletableFuture<>(); + zk.exists("/", false, (StatCallback) (rc, path, ctx, stat) -> { + switch (KeeperException.Code.get(rc)) { + case CONNECTIONLOSS: + future.complete(Watcher.Event.KeeperState.Disconnected); + break; + + case SESSIONEXPIRED: + future.complete(Watcher.Event.KeeperState.Expired); + break; + + case OK: + default: + future.complete(Watcher.Event.KeeperState.SyncConnected); + } + }, null); + + Watcher.Event.KeeperState zkClientState; + try { + zkClientState = future.get(tickTimeMillis, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + // Consider zk disconnection if zk operation takes more than TICK_TIME + zkClientState = Watcher.Event.KeeperState.Disconnected; + } + + checkState(zkClientState); + } catch (RejectedExecutionException | InterruptedException e) { + task.cancel(true); + } catch (Throwable t) { + log.warn("Error while checking ZK connection status", t); + } + } + + @Override + public synchronized void process(WatchedEvent event) { + checkState(event.getState()); + } + + private void checkState(Watcher.Event.KeeperState zkClientState) { + switch (zkClientState) { + case Expired: + if (currentStatus != SessionEvent.SessionLost) { + log.error("ZooKeeper session expired"); + currentStatus = SessionEvent.SessionLost; + sessionListener.accept(currentStatus); + } + break; + + case Disconnected: + if (disconnectedAt == 0) { + // this is the first disconnect event, we should monitor the time out from now, so we record the + // time of disconnect + disconnectedAt = System.nanoTime(); + } + + long timeRemainingMillis = monitorTimeoutMillis + - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - disconnectedAt); + if (timeRemainingMillis <= 0 && currentStatus != SessionEvent.SessionLost) { + log.error("ZooKeeper session reconnection timeout. Notifying session is lost."); + currentStatus = SessionEvent.SessionLost; + sessionListener.accept(currentStatus); + } else if (currentStatus != SessionEvent.SessionLost) { + log.warn("ZooKeeper client is disconnected. Waiting to reconnect, time remaining = {} seconds", + timeRemainingMillis / 1000.0); + if (currentStatus == SessionEvent.SessionReestablished) { + currentStatus = SessionEvent.ConnectionLost; + sessionListener.accept(currentStatus); + } + } + break; + + default: + if (currentStatus != SessionEvent.SessionReestablished) { + // since it reconnected to zoo keeper, we reset the disconnected time + log.info("ZooKeeper client reconnection with server quorum"); + disconnectedAt = 0; + + sessionListener.accept(SessionEvent.Reconnected); + if (currentStatus == SessionEvent.SessionLost) { + sessionListener.accept(SessionEvent.SessionReestablished); + } + currentStatus = SessionEvent.SessionReestablished; + } + break; + } + } +} diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java new file mode 100644 index 0000000000000..1249adec4d6cc --- /dev/null +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.metadata; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; + +import java.util.EnumSet; +import java.util.Optional; + +import lombok.Cleanup; + +import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.Stat; +import org.apache.pulsar.metadata.api.extended.CreateOption; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.testng.annotations.Test; + +public class MetadataStoreExtendedTest extends BaseMetadataStoreTest { + + @Test(dataProvider = "impl") + public void sequentialKeys(String provider, String url) throws Exception { + final String basePath = "/my/path"; + + @Cleanup + MetadataStoreExtended store = MetadataStoreExtended.create(url, MetadataStoreConfig.builder().build()); + + Stat stat1 = store.put(basePath, "value-1".getBytes(), Optional.of(-1L), EnumSet.of(CreateOption.Sequential)) + .join(); + assertNotNull(stat1); + assertEquals(stat1.getVersion(), 0L); + assertNotEquals(stat1.getPath(), basePath); + assertEquals(store.get(stat1.getPath()).join().get().getValue(), "value-1".getBytes()); + String seq1 = stat1.getPath().replace(basePath, ""); + long n1 = Long.parseLong(seq1); + + Stat stat2 = store.put(basePath, "value-2".getBytes(), Optional.of(-1L), EnumSet.of(CreateOption.Sequential)) + .join(); + assertNotNull(stat2); + assertEquals(stat2.getVersion(), 0L); + assertNotEquals(stat2.getPath(), basePath); + assertNotEquals(stat2.getPath(), stat1.getPath()); + assertEquals(store.get(stat2.getPath()).join().get().getValue(), "value-2".getBytes()); + String seq2 = stat2.getPath().replace(basePath, ""); + long n2 = Long.parseLong(seq2); + + assertNotEquals(seq1, seq2); + assertNotEquals(n1, n2); + } +} diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/TestZKServer.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/TestZKServer.java index a1a06301618c2..e49d30e71b600 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/TestZKServer.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/TestZKServer.java @@ -38,10 +38,10 @@ @Slf4j public class TestZKServer implements AutoCloseable { - - protected final ZooKeeperServer zks; + protected ZooKeeperServer zks; private final File zkDataDir; - private final ServerCnxnFactory serverFactory; + private ServerCnxnFactory serverFactory; + private int zkPort = 0; public TestZKServer() throws Exception { this.zkDataDir = Files.newTemporaryFolder(); @@ -50,28 +50,44 @@ public TestZKServer() throws Exception { System.setProperty("zookeeper.4lw.commands.whitelist", "*"); // disable the admin server as to not have any port conflicts System.setProperty("zookeeper.admin.enableServer", "false"); + start(); + } + + public void start() throws Exception { this.zks = new ZooKeeperServer(zkDataDir, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME); this.serverFactory = new NIOServerCnxnFactory(); - this.serverFactory.configure(new InetSocketAddress(0), 1000); + this.serverFactory.configure(new InetSocketAddress(zkPort), 1000); this.serverFactory.startup(zks, true); - log.info("Started test ZK server on port {}", getPort()); + this.zkPort = serverFactory.getLocalPort(); + log.info("Started test ZK server on port {}", zkPort); boolean zkServerReady = waitForServerUp(this.getConnectionString(), 30_000); assertTrue(zkServerReady); } + public void stop() throws Exception { + if (zks != null) { + zks.shutdown(); + zks = null; + } + + if (serverFactory != null) { + serverFactory.shutdown(); + serverFactory = null; + } + log.info("Stopped test ZK server"); + } + + @Override public void close() throws Exception { - zks.shutdown(); - serverFactory.shutdown(); + stop(); FileUtils.deleteDirectory(zkDataDir); - - log.info("Stopped test ZK server"); } public int getPort() { - return serverFactory.getLocalPort(); + return zkPort; } public String getConnectionString() { diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java new file mode 100644 index 0000000000000..577b872b22ac2 --- /dev/null +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/ZKSessionTest.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.metadata; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import lombok.Cleanup; + +import org.apache.pulsar.metadata.api.MetadataStoreConfig; +import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.apache.pulsar.metadata.api.extended.SessionEvent; +import org.testng.annotations.Test; + +public class ZKSessionTest extends BaseMetadataStoreTest { + + @Test + public void testDisconnection() throws Exception { + @Cleanup + MetadataStoreExtended store = MetadataStoreExtended.create(zks.getConnectionString(), + MetadataStoreConfig.builder() + .sessionTimeoutMillis(30_000) + .build()); + + BlockingQueue sessionEvents = new LinkedBlockingQueue<>(); + store.registerSessionListener(sessionEvents::add); + + zks.stop(); + + SessionEvent e = sessionEvents.poll(5, TimeUnit.SECONDS); + assertEquals(e, SessionEvent.ConnectionLost); + + zks.start(); + e = sessionEvents.poll(10, TimeUnit.SECONDS); + assertEquals(e, SessionEvent.Reconnected); + + e = sessionEvents.poll(1, TimeUnit.SECONDS); + assertNull(e); + } + + @Test + public void testSessionLost() throws Exception { + @Cleanup + MetadataStoreExtended store = MetadataStoreExtended.create(zks.getConnectionString(), + MetadataStoreConfig.builder() + .sessionTimeoutMillis(10_000) + .build()); + + BlockingQueue sessionEvents = new LinkedBlockingQueue<>(); + store.registerSessionListener(sessionEvents::add); + + zks.stop(); + + SessionEvent e = sessionEvents.poll(5, TimeUnit.SECONDS); + assertEquals(e, SessionEvent.ConnectionLost); + + e = sessionEvents.poll(10, TimeUnit.SECONDS); + assertEquals(e, SessionEvent.SessionLost); + + zks.start(); + e = sessionEvents.poll(10, TimeUnit.SECONDS); + assertEquals(e, SessionEvent.Reconnected); + e = sessionEvents.poll(10, TimeUnit.SECONDS); + assertEquals(e, SessionEvent.SessionReestablished); + + e = sessionEvents.poll(1, TimeUnit.SECONDS); + assertNull(e); + } +} diff --git a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java index 2d523b929b87d..8b94d3e40396e 100644 --- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java +++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java @@ -148,6 +148,11 @@ private void init(ExecutorService executor) { failures = new CopyOnWriteArrayList<>(); } + @Override + public int getSessionTimeout() { + return 30_000; + } + private MockZooKeeper(String quorum) throws Exception { // This constructor is never called super(quorum, 1, event -> {});