Skip to content

Commit

Permalink
PIP-45: Removed access to ZK client from PulsarService (apache#12100)
Browse files Browse the repository at this point in the history
* PIP-45: Removed access to ZK client from PulsarService

* Fixed reflection access

* Fixed reflection signature

* Added javadoc
  • Loading branch information
merlimat authored Sep 20, 2021
1 parent f4f4cdd commit e189de9
Show file tree
Hide file tree
Showing 15 changed files with 51 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,6 @@
*/
package org.apache.pulsar.broker.cache;

import java.util.Map;

import org.apache.bookkeeper.util.ZkUtils;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.type.TypeReference;

import lombok.Getter;

/**
* ConfigurationCacheService is only kept for compatibility as it was exposed in AuthorizationProvider interface.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.function.Consumer;
import lombok.Cleanup;
import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;
import org.apache.zookeeper.ZooKeeper.States;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -87,11 +86,9 @@ public void run() {
@Override
public void shutdown(int exitCode) {
try {
// Try to close ZK session to ensure all ephemeral locks gets released immediately
// Try to close metadata service session to ensure all ephemeral locks get released immediately
if (service != null) {
if (service.getZkClient().getState() != States.CLOSED) {
service.getZkClient().close();
}
service.closeMetadataServiceSession();
}
} catch (Exception e) {
LOG.warn(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,17 @@ public MetadataStoreExtended createConfigurationMetadataStore() throws MetadataS
.build());
}

/**
* Close the session to the metadata service.
*
* This will immediately release all the resource locks held by this broker on the coordination service.
*
* @throws IOException if the close operation fails
*/
public void closeMetadataServiceSession() throws IOException {
localZooKeeperConnectionProvider.close();
}

@Override
public void close() throws PulsarServerException {
try {
Expand Down Expand Up @@ -632,7 +643,8 @@ public void start() throws PulsarServerException {
this.bkClientFactory = newBookKeeperClientFactory();

managedLedgerClientFactory = ManagedLedgerStorage.create(
config, localMetadataStore, getZkClient(), bkClientFactory, ioEventLoopGroup
config, localMetadataStore, localZooKeeperConnectionProvider.getLocalZooKeeper(),
bkClientFactory, ioEventLoopGroup
);

this.brokerService = newBrokerService(this);
Expand Down Expand Up @@ -1062,10 +1074,6 @@ public String getStatusFilePath() {
return config.getStatusFilePath();
}

public ZooKeeper getZkClient() {
return this.localZooKeeperConnectionProvider.getLocalZooKeeper();
}

/**
* Get default bookkeeper metadata service uri.
*/
Expand Down Expand Up @@ -1214,8 +1222,9 @@ public synchronized LedgerOffloader createManagedLedgerOffloader(OffloadPolicies
private SchemaStorage createAndStartSchemaStorage() throws Exception {
final Class<?> storageClass = Class.forName(config.getSchemaRegistryStorageClassName());
Object factoryInstance = storageClass.getDeclaredConstructor().newInstance();
Method createMethod = storageClass.getMethod("create", PulsarService.class);
SchemaStorage schemaStorage = (SchemaStorage) createMethod.invoke(factoryInstance, this);
Method createMethod = storageClass.getMethod("create", PulsarService.class, ZooKeeper.class);
SchemaStorage schemaStorage = (SchemaStorage) createMethod.invoke(factoryInstance, this,
localZooKeeperConnectionProvider.getLocalZooKeeper());
schemaStorage.start();
return schemaStorage;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -71,6 +72,7 @@ public class BookkeeperSchemaStorage implements SchemaStorage {

private final MetadataStore store;
private final PulsarService pulsar;
private final ZooKeeper zooKeeper;
private final MetadataCache<SchemaStorageFormat.SchemaLocator> locatorEntryCache;

private final ServiceConfiguration config;
Expand All @@ -80,8 +82,9 @@ public class BookkeeperSchemaStorage implements SchemaStorage {
new ConcurrentHashMap<>();

@VisibleForTesting
BookkeeperSchemaStorage(PulsarService pulsar) {
BookkeeperSchemaStorage(PulsarService pulsar, ZooKeeper zooKeeper) {
this.pulsar = pulsar;
this.zooKeeper = zooKeeper;
this.store = pulsar.getLocalMetadataStore();
this.config = pulsar.getConfiguration();
this.locatorEntryCache = store.getMetadataCache(new MetadataSerde<SchemaStorageFormat.SchemaLocator>() {
Expand All @@ -101,7 +104,7 @@ public SchemaStorageFormat.SchemaLocator deserialize(byte[] content) throws IOEx
public void start() throws IOException {
this.bookKeeper = pulsar.getBookKeeperClientFactory().create(
pulsar.getConfiguration(),
pulsar.getZkClient(),
zooKeeper,
pulsar.getIoEventLoopGroup(),
Optional.empty(),
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@
import javax.validation.constraints.NotNull;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.zookeeper.ZooKeeper;

@SuppressWarnings("unused")
public class BookkeeperSchemaStorageFactory implements SchemaStorageFactory {
@Override
@NotNull
public SchemaStorage create(PulsarService pulsar) {
return new BookkeeperSchemaStorage(pulsar);
public SchemaStorage create(PulsarService pulsar, ZooKeeper zooKeeper) {
return new BookkeeperSchemaStorage(pulsar, zooKeeper);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
import javax.validation.constraints.NotNull;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.protocol.schema.SchemaStorage;
import org.apache.zookeeper.ZooKeeper;

public interface SchemaStorageFactory {
@NotNull
SchemaStorage create(PulsarService pulsar) throws Exception;
SchemaStorage create(PulsarService pulsar, ZooKeeper zookeeper) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,22 @@
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.hash.Hashing;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.URI;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.loadbalance.LoadManager;
Expand All @@ -66,16 +63,13 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.coordination.LockManager;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -341,12 +335,16 @@ public void testLoadReportDeserialize() throws Exception {
String path1 = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri1.getHost(), uri1.getPort());
String path2 = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri2.getHost(), uri2.getPort());

ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), path1,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(lr), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), path2,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(ld), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
pulsar.getLocalMetadataStore().put(path1,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(lr),
Optional.empty(),
EnumSet.of(CreateOption.Ephemeral)
).join();
pulsar.getLocalMetadataStore().put(path2,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(ld),
Optional.empty(),
EnumSet.of(CreateOption.Ephemeral)
).join();
LookupResult result1 = pulsar.getNamespaceService().createLookupResult(candidateBroker1, false, null).get();

// update to new load manager
Expand All @@ -371,9 +369,11 @@ public void testCreateLookupResult() throws Exception {
LocalBrokerData ld = new LocalBrokerData(null, null, candidateBroker, null, advertisedListeners);
URI uri = new URI(candidateBroker);
String path = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri.getHost(), uri.getPort());
ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), path,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(ld), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);

pulsar.getLocalMetadataStore().put(path,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(ld),
Optional.empty(),
EnumSet.of(CreateOption.Ephemeral));

LookupResult noListener = pulsar.getNamespaceService().createLookupResult(candidateBroker, false, null).get();
LookupResult withListener = pulsar.getNamespaceService().createLookupResult(candidateBroker, false, listener).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,6 @@ public void setup(Method m) throws Exception {
mlFactoryMock = factory;
doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();

ZooKeeper mockZk = TransactionTestBase.createMockZooKeeper();
doReturn(mockZk).when(pulsar).getZkClient();

brokerService = spy(new BrokerService(pulsar, eventLoopGroup));
doReturn(brokerService).when(pulsar).getBrokerService();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ public void setup() throws Exception {
}).when(mlFactoryMock).asyncDelete(any(), any(), any());

ZooKeeper mockZk = createMockZooKeeper();
doReturn(mockZk).when(pulsar).getZkClient();
doReturn(createMockBookKeeper(executor))
.when(pulsar).getBookKeeperClient();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ public void setup() throws Exception {
doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();

ZooKeeper mockZk = createMockZooKeeper();
doReturn(mockZk).when(pulsar).getZkClient();
doReturn(createMockBookKeeper(executor))
.when(pulsar).getBookKeeperClient();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ public CompletableFuture<Void> appendAbortMark(TxnID txnID, AckType ackType) {
doReturn(mlFactoryMock).when(pulsarMock).getManagedLedgerFactory();

ZooKeeper zkMock = createMockZooKeeper();
doReturn(zkMock).when(pulsarMock).getZkClient();
doReturn(createMockBookKeeper(executor))
.when(pulsarMock).getBookKeeperClient();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.zookeeper.ZooKeeper;
import org.testng.annotations.Test;

import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.bkException;
Expand Down Expand Up @@ -62,7 +63,7 @@ public void testVersionFromBytes() {

PulsarService mockPulsarService = mock(PulsarService.class);
when(mockPulsarService.getLocalMetadataStore()).thenReturn(mock(MetadataStoreExtended.class));
BookkeeperSchemaStorage schemaStorage = new BookkeeperSchemaStorage(mockPulsarService);
BookkeeperSchemaStorage schemaStorage = new BookkeeperSchemaStorage(mockPulsarService, mock(ZooKeeper.class));
assertEquals(new LongSchemaVersion(version), schemaStorage.versionFromBytes(versionBytesPre240));
assertEquals(new LongSchemaVersion(version), schemaStorage.versionFromBytes(versionBytesPost240));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class SchemaServiceTest extends MockedPulsarServiceBaseTest {
protected void setup() throws Exception {
conf.setSchemaRegistryStorageClassName("org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory");
super.internalSetup();
BookkeeperSchemaStorage storage = new BookkeeperSchemaStorage(pulsar);
BookkeeperSchemaStorage storage = new BookkeeperSchemaStorage(pulsar, mockZooKeeper);
storage.start();
Map<SchemaType, SchemaCompatibilityCheck> checkMap = new HashMap<>();
checkMap.put(SchemaType.AVRO, new AvroSchemaCompatibilityCheck());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,10 +408,10 @@ public CompletableFuture<ZooKeeper> create(String serverList, SessionType sessio
pulsar.start();

try {
pulsar.getZkClient().delete("/minApiVersion", -1);
pulsar.getLocalMetadataStore().delete("/minApiVersion", Optional.empty()).join();
} catch (Exception ex) {
}
pulsar.getZkClient().create("/minApiVersion", minApiVersion.getBytes(), null, CreateMode.PERSISTENT);
pulsar.getLocalMetadataStore().put("/minApiVersion", minApiVersion.getBytes(), Optional.of(-1L)).join();

String BROKER_URL_BASE = "http://localhost:" + pulsar.getListenPortHTTP().get();
String BROKER_URL_BASE_TLS = "https://localhost:" + pulsar.getListenPortHTTPS().orElse(-1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void testCheckSequenceId() throws Exception {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
ManagedLedgerClientFactory clientFactory = new ManagedLedgerClientFactory();
clientFactory.initialize(pulsar.getConfiguration(), pulsar.getLocalMetadataStore(),
pulsar.getZkClient(), pulsar.getBookKeeperClientFactory(), eventLoopGroup);
bkEnsemble.getZkClient(), pulsar.getBookKeeperClientFactory(), eventLoopGroup);
ManagedLedgerFactory mlFactory = clientFactory.getManagedLedgerFactory();
ManagedLedger ml = mlFactory.open(TopicName.get(topicName).getPersistenceNamingEncoding());
ml.close();
Expand Down

0 comments on commit e189de9

Please sign in to comment.