Skip to content

Commit

Permalink
PIP-45: Added session events to metadata store (apache#9273)
Browse files Browse the repository at this point in the history
* PIP-45: Added session events to metadata store

* Added missing license header

* Fixed session timeout in MockZookeeper

* Increased test timeouts

* Increase session timeout in ZKSessionTest

* Fixed merge issue
  • Loading branch information
merlimat authored Feb 5, 2021
1 parent 13faf63 commit 683ee5f
Show file tree
Hide file tree
Showing 9 changed files with 441 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.EnumSet;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
Expand Down Expand Up @@ -71,4 +72,12 @@ public static MetadataStoreExtended create(String metadataURL, MetadataStoreConf
*/
CompletableFuture<Stat> put(String path, byte[] value, Optional<Long> expectedVersion,
EnumSet<CreateOption> options);

/**
* Register a session listener that will get notified of changes in status of the current session.
*
* @param listener
* the session listener
*/
void registerSessionListener(Consumer<SessionEvent> listener);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.metadata.api.extended;

/**
* An event regarding a session of MetadataStore
*/
public enum SessionEvent {

/**
* The client is temporarily disconnected, although the session is still valid
*/
ConnectionLost,

/**
* The client was able to successfully reconnect
*/
Reconnected,

/**
* The session was lost, all the ephemeral keys created on the store within the current session might have been
* already expired.
*/
SessionLost,

/**
* The session was established
*/
SessionReestablished,
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;

@Slf4j
Expand All @@ -54,6 +55,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
private static final long CACHE_REFRESH_TIME_MILLIS = TimeUnit.MINUTES.toMillis(5);

private final CopyOnWriteArrayList<Consumer<Notification>> listeners = new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<Consumer<SessionEvent>> sessionListeners = new CopyOnWriteArrayList<>();
protected final ExecutorService executor;
private final AsyncLoadingCache<String, List<String>> childrenCache;
private final AsyncLoadingCache<String, Boolean> existsCache;
Expand Down Expand Up @@ -206,6 +208,21 @@ public final CompletableFuture<Stat> put(String path, byte[] data, Optional<Long
});
}

@Override
public void registerSessionListener(Consumer<SessionEvent> listener) {
sessionListeners.add(listener);
}

protected void receivedSessionEvent(SessionEvent event) {
sessionListeners.forEach(l -> {
try {
l.accept(event);
} catch (Throwable t) {
log.warn("Error in processing session event", t);
}
});
}

@Override
public void close() throws Exception {
executor.shutdownNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,22 @@ public class ZKMetadataStore extends AbstractMetadataStore implements MetadataSt

private final boolean isZkManaged;
private final ZooKeeper zkc;
private ZKSessionWatcher sessionWatcher;

public ZKMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig) throws MetadataStoreException {
try {
isZkManaged = true;
zkc = ZooKeeperClient.newBuilder().connectString(metadataURL)
.connectRetryPolicy(new BoundExponentialBackoffRetryPolicy(100, 60_000, Integer.MAX_VALUE))
.allowReadOnlyMode(metadataStoreConfig.isAllowReadOnlyOperations())
.sessionTimeoutMs(metadataStoreConfig.getSessionTimeoutMillis()).build();
.sessionTimeoutMs(metadataStoreConfig.getSessionTimeoutMillis())
.watchers(Collections.singleton(event -> {
if (sessionWatcher != null) {
sessionWatcher.process(event);
}
}))
.build();
sessionWatcher = new ZKSessionWatcher(zkc, this::receivedSessionEvent);
} catch (Throwable t) {
throw new MetadataStoreException(t);
}
Expand All @@ -71,6 +79,7 @@ public ZKMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConf
public ZKMetadataStore(ZooKeeper zkc) {
this.isZkManaged = false;
this.zkc = zkc;
this.sessionWatcher = new ZKSessionWatcher(zkc, this::receivedSessionEvent);
}

@Override
Expand Down Expand Up @@ -271,6 +280,7 @@ public void close() throws Exception {
if (isZkManaged) {
zkc.close();
}
sessionWatcher.close();
super.close();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.metadata.impl;

import io.netty.util.concurrent.DefaultThreadFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

import lombok.extern.slf4j.Slf4j;

import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

/**
* Monitor the ZK session state every few seconds and send notifications
*/
@Slf4j
public class ZKSessionWatcher implements AutoCloseable, Watcher {
private final ZooKeeper zk;

private SessionEvent currentStatus;
private final Consumer<SessionEvent> sessionListener;

// Maximum time to wait for ZK session to be re-connected to quorum (set to 5/6 of SessionTimeout)
private final long monitorTimeoutMillis;

// Interval at which we check the state of the zk session (set to 1/15 of SessionTimeout)
private final long tickTimeMillis;

private final ScheduledExecutorService scheduler;
private final ScheduledFuture<?> task;

private long disconnectedAt = 0;

public ZKSessionWatcher(ZooKeeper zk, Consumer<SessionEvent> sessionListener) {
this.zk = zk;
this.monitorTimeoutMillis = zk.getSessionTimeout() * 5 / 6;
this.tickTimeMillis = zk.getSessionTimeout() / 15;
this.sessionListener = sessionListener;

this.scheduler = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metadata-store-zk-session-watcher"));
this.task = scheduler.scheduleAtFixedRate(this::checkConnectionStatus, tickTimeMillis, tickTimeMillis,
TimeUnit.MILLISECONDS);
this.currentStatus = SessionEvent.SessionReestablished;
}

@Override
public void close() throws Exception {
task.cancel(true);
scheduler.shutdownNow();
scheduler.awaitTermination(10, TimeUnit.SECONDS);
}

// task that runs every TICK_TIME to check zk connection
private synchronized void checkConnectionStatus() {
try {
CompletableFuture<Watcher.Event.KeeperState> future = new CompletableFuture<>();
zk.exists("/", false, (StatCallback) (rc, path, ctx, stat) -> {
switch (KeeperException.Code.get(rc)) {
case CONNECTIONLOSS:
future.complete(Watcher.Event.KeeperState.Disconnected);
break;

case SESSIONEXPIRED:
future.complete(Watcher.Event.KeeperState.Expired);
break;

case OK:
default:
future.complete(Watcher.Event.KeeperState.SyncConnected);
}
}, null);

Watcher.Event.KeeperState zkClientState;
try {
zkClientState = future.get(tickTimeMillis, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
// Consider zk disconnection if zk operation takes more than TICK_TIME
zkClientState = Watcher.Event.KeeperState.Disconnected;
}

checkState(zkClientState);
} catch (RejectedExecutionException | InterruptedException e) {
task.cancel(true);
} catch (Throwable t) {
log.warn("Error while checking ZK connection status", t);
}
}

@Override
public synchronized void process(WatchedEvent event) {
checkState(event.getState());
}

private void checkState(Watcher.Event.KeeperState zkClientState) {
switch (zkClientState) {
case Expired:
if (currentStatus != SessionEvent.SessionLost) {
log.error("ZooKeeper session expired");
currentStatus = SessionEvent.SessionLost;
sessionListener.accept(currentStatus);
}
break;

case Disconnected:
if (disconnectedAt == 0) {
// this is the first disconnect event, we should monitor the time out from now, so we record the
// time of disconnect
disconnectedAt = System.nanoTime();
}

long timeRemainingMillis = monitorTimeoutMillis
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - disconnectedAt);
if (timeRemainingMillis <= 0 && currentStatus != SessionEvent.SessionLost) {
log.error("ZooKeeper session reconnection timeout. Notifying session is lost.");
currentStatus = SessionEvent.SessionLost;
sessionListener.accept(currentStatus);
} else if (currentStatus != SessionEvent.SessionLost) {
log.warn("ZooKeeper client is disconnected. Waiting to reconnect, time remaining = {} seconds",
timeRemainingMillis / 1000.0);
if (currentStatus == SessionEvent.SessionReestablished) {
currentStatus = SessionEvent.ConnectionLost;
sessionListener.accept(currentStatus);
}
}
break;

default:
if (currentStatus != SessionEvent.SessionReestablished) {
// since it reconnected to zoo keeper, we reset the disconnected time
log.info("ZooKeeper client reconnection with server quorum");
disconnectedAt = 0;

sessionListener.accept(SessionEvent.Reconnected);
if (currentStatus == SessionEvent.SessionLost) {
sessionListener.accept(SessionEvent.SessionReestablished);
}
currentStatus = SessionEvent.SessionReestablished;
}
break;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.metadata;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;

import java.util.EnumSet;
import java.util.Optional;

import lombok.Cleanup;

import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.testng.annotations.Test;

public class MetadataStoreExtendedTest extends BaseMetadataStoreTest {

@Test(dataProvider = "impl")
public void sequentialKeys(String provider, String url) throws Exception {
final String basePath = "/my/path";

@Cleanup
MetadataStoreExtended store = MetadataStoreExtended.create(url, MetadataStoreConfig.builder().build());

Stat stat1 = store.put(basePath, "value-1".getBytes(), Optional.of(-1L), EnumSet.of(CreateOption.Sequential))
.join();
assertNotNull(stat1);
assertEquals(stat1.getVersion(), 0L);
assertNotEquals(stat1.getPath(), basePath);
assertEquals(store.get(stat1.getPath()).join().get().getValue(), "value-1".getBytes());
String seq1 = stat1.getPath().replace(basePath, "");
long n1 = Long.parseLong(seq1);

Stat stat2 = store.put(basePath, "value-2".getBytes(), Optional.of(-1L), EnumSet.of(CreateOption.Sequential))
.join();
assertNotNull(stat2);
assertEquals(stat2.getVersion(), 0L);
assertNotEquals(stat2.getPath(), basePath);
assertNotEquals(stat2.getPath(), stat1.getPath());
assertEquals(store.get(stat2.getPath()).join().get().getValue(), "value-2".getBytes());
String seq2 = stat2.getPath().replace(basePath, "");
long n2 = Long.parseLong(seq2);

assertNotEquals(seq1, seq2);
assertNotEquals(n1, n2);
}
}
Loading

0 comments on commit 683ee5f

Please sign in to comment.