Skip to content

Commit

Permalink
PIP-45: Implement distributed id generator using coordination service (
Browse files Browse the repository at this point in the history
…apache#9274)

* PIP-45: Implement distributed id generator using coordination service

* Fixed MockZookeeper to create the sequential nodes

* Fixed NPE in MockZookeper
  • Loading branch information
merlimat authored Feb 4, 2021
1 parent fa89a03 commit 78e07e9
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,8 @@ private ServerBootstrap defaultServerBootstrap() {
}

public void start() throws Exception {
this.producerNameGenerator = new DistributedIdGenerator(pulsar.getZkClient(), PRODUCER_NAME_GENERATOR_PATH,
pulsar.getConfiguration().getClusterName());
this.producerNameGenerator = new DistributedIdGenerator(pulsar.getCoordinationService(),
PRODUCER_NAME_GENERATOR_PATH, pulsar.getConfiguration().getClusterName());

ServerBootstrap bootstrap = defaultServerBootstrap.clone();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@
package org.apache.pulsar.broker.service;

import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.pulsar.metadata.api.coordination.CoordinationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,43 +35,24 @@
public class DistributedIdGenerator {

private final String prefix;
private final int generatorInstanceId;
private final long generatorInstanceId;
private final AtomicLong counter;

/**
*
* @param zk
* @param cs
* {@link CoordinationService}
* @param path
* path of the z-node used to track the generators ids
* @param prefix
* prefix to prepend to the generated id. Having a unique prefix can make the id globally unique
* @throws Exception
*/
public DistributedIdGenerator(ZooKeeper zk, String path, String prefix) throws Exception {
public DistributedIdGenerator(CoordinationService cs, String path, String prefix) throws Exception {
this.prefix = prefix;
this.counter = new AtomicLong(0);

// Create base path if it doesn't exist
if (zk.exists(path, false) == null) {
try {
ZkUtils.createFullPathOptimistic(zk, path, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} catch (NodeExistsException e) {
// Ok
}
}

// Create an ephemeral sequential z-node that will have a name containing a unique number. We'll use this number
// as a prefix for all the generated ids, in addition to the specified prefix.
String createdPath = zk.create(path + "/-", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);

// Parse the sequential z-node name and extract the unique number
String[] parts = createdPath.split("/");
String name = parts[parts.length - 1].replace('-', ' ').trim();

this.generatorInstanceId = Integer.parseInt(name);
log.info("Created sequential node at {} -- Generator Id is {}-{}", createdPath, prefix, generatorInstanceId);
this.generatorInstanceId = cs.getNextCounterValue(path).get();
log.info("Broker distributed id generator started with instance id {}-{}", prefix, generatorInstanceId);
}

public String getNextId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,50 +19,51 @@
package org.apache.pulsar.broker.service;

import static org.testng.Assert.assertEquals;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.pulsar.broker.service.DistributedIdGenerator;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.ZooDefs;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.coordination.CoordinationService;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

public class DistributedIdGeneratorTest {

private MockZooKeeper zkc;
private MetadataStoreExtended store;
private CoordinationService coordinationService;

@BeforeClass
@BeforeMethod
public void setup() throws Exception {
zkc = MockZooKeeper.newInstance();
store = MetadataStoreExtended.create("memory://local", MetadataStoreConfig.builder().build());
coordinationService = new CoordinationServiceImpl(store);
}

@AfterClass(alwaysRun = true)
@AfterMethod(alwaysRun = true)
public void teardown() throws Exception {
zkc.shutdown();
coordinationService.close();
store.close();
}

@Test
public void simple() throws Exception {
DistributedIdGenerator gen1 = new DistributedIdGenerator(zkc, "/my/test/simple", "p");
DistributedIdGenerator gen1 = new DistributedIdGenerator(coordinationService, "/my/test/simple", "p");

assertEquals(gen1.getNextId(), "p-0-0");
assertEquals(gen1.getNextId(), "p-0-1");
assertEquals(gen1.getNextId(), "p-0-2");
assertEquals(gen1.getNextId(), "p-0-3");

DistributedIdGenerator gen2 = new DistributedIdGenerator(zkc, "/my/test/simple", "p");
DistributedIdGenerator gen2 = new DistributedIdGenerator(coordinationService, "/my/test/simple", "p");
assertEquals(gen2.getNextId(), "p-1-0");
assertEquals(gen2.getNextId(), "p-1-1");

Expand All @@ -87,7 +88,7 @@ public void concurrent() throws Exception {
for (int i = 0; i < Threads; i++) {
executor.execute(() -> {
try {
DistributedIdGenerator gen = new DistributedIdGenerator(zkc, "/my/test/concurrent", "prefix");
DistributedIdGenerator gen = new DistributedIdGenerator(coordinationService, "/my/test/concurrent", "prefix");

barrier.await();

Expand Down Expand Up @@ -116,9 +117,9 @@ public void concurrent() throws Exception {

@Test
public void invalidZnode() throws Exception {
zkc.create("/my/test/invalid", "invalid-number".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
store.put("/my/test/invalid", "invalid-number".getBytes(), Optional.of(-1L));

DistributedIdGenerator gen = new DistributedIdGenerator(zkc, "/my/test/invalid", "p");
DistributedIdGenerator gen = new DistributedIdGenerator(coordinationService, "/my/test/invalid", "p");

// It should not get exception if content is there
assertEquals(gen.getNextId(), "p-0-0");
Expand Down
22 changes: 17 additions & 5 deletions testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiPredicate;
Expand Down Expand Up @@ -67,7 +68,9 @@ public class MockZooKeeper extends ZooKeeper {
private int readOpDelayMs;

private ReentrantLock mutex;


private AtomicLong sequentialIdGenerator;

//see details of Objenesis caching - http://objenesis.org/details.html
//see supported jvms - https://github.com/easymock/objenesis/blob/master/SupportedJVMs.md
private static final Objenesis objenesis = new ObjenesisStd();
Expand Down Expand Up @@ -106,6 +109,7 @@ public static MockZooKeeper newInstanceForGlobalZK(ExecutorService executor, int
zk.init(executor);
zk.readOpDelayMs = readOpDelayMs;
zk.mutex = new ReentrantLock();
zk.sequentialIdGenerator = new AtomicLong();
return zk;
} catch (RuntimeException e) {
throw e;
Expand All @@ -121,6 +125,7 @@ public static MockZooKeeper newInstance(ExecutorService executor, int readOpDela
zk.init(executor);
zk.readOpDelayMs = readOpDelayMs;
zk.mutex = new ReentrantLock();
zk.sequentialIdGenerator = new AtomicLong();
return zk;
} catch (RuntimeException e) {
throw e;
Expand Down Expand Up @@ -246,6 +251,13 @@ public void create(final String path, final byte[] data, final List<ACL> acl, Cr
toNotifyParent.addAll(watchers.get(parent));
}

final String name;
if (createMode != null && createMode.isSequential()) {
name = path + Long.toString(sequentialIdGenerator.getAndIncrement());
} else {
name = path;
}

Optional<KeeperException.Code> failure = programmedFailure(Op.CREATE, path);
if (failure.isPresent()) {
mutex.unlock();
Expand All @@ -260,16 +272,16 @@ public void create(final String path, final byte[] data, final List<ACL> acl, Cr
mutex.unlock();
cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
} else {
tree.put(path, Pair.of(data, 0));
watchers.removeAll(path);
tree.put(name, Pair.of(data, 0));
watchers.removeAll(name);
mutex.unlock();
cb.processResult(0, path, ctx, null);
cb.processResult(0, path, ctx, name);

toNotifyCreate.forEach(
watcher -> watcher.process(
new WatchedEvent(EventType.NodeCreated,
KeeperState.SyncConnected,
path)));
name)));
toNotifyParent.forEach(
watcher -> watcher.process(
new WatchedEvent(EventType.NodeChildrenChanged,
Expand Down

0 comments on commit 78e07e9

Please sign in to comment.