Skip to content

Commit

Permalink
[pulsar-broker] Fix some comment typos (apache#5926)
Browse files Browse the repository at this point in the history
Fix some comment typos in the pulsar-broker module, no real code changes.
  • Loading branch information
zhenglaizhang authored and sijie committed Jan 24, 2020
1 parent 8f62eb5 commit 648ed0e
Show file tree
Hide file tree
Showing 15 changed files with 31 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
private BrokerHostUsage brokerHostUsage;

// Map from brokers to namespaces to the bundle ranges in that namespace assigned to that broker.
// Used to distribute bundles within a namespace evely across brokers.
// Used to distribute bundles within a namespace evenly across brokers.
private final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<String>>> brokerToNamespaceToBundleRange;

// Path to the ZNode containing the LocalBrokerData json for this broker.
Expand All @@ -138,7 +138,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
private ServiceConfiguration conf;

// The default bundle stats which are used to initialize historic data.
// This data is overriden after the bundle receives its first sample.
// This data is overridden after the bundle receives its first sample.
private final NamespaceBundleStats defaultStats;

// Used to filter brokers from being selected for assignment.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public NamespaceBundle getNamespaceBundle() {
*
* @param pulsar
* @param timeout
* timeout for unloading bundle. It doesn't throw exception if it timesout while waiting on closing all
* timeout for unloading bundle. It doesn't throw exception if it times out while waiting on closing all
* topics
* @param timeoutUnit
* @throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ public synchronized void setupTopicPublishRateLimiterMonitor() {
// schedule task that sums up publish-rate across all cnx on a topic
topicPublishRateLimiterMonitor.scheduleAtFixedRate(safeRun(() -> checkTopicPublishThrottlingRate()),
topicTickTimeMs, topicTickTimeMs, TimeUnit.MILLISECONDS);
// schedule task that refreshes rate-limitting bucket
// schedule task that refreshes rate-limiting bucket
topicPublishRateLimiterMonitor.scheduleAtFixedRate(safeRun(() -> refreshTopicPublishRate()), 1, 1,
TimeUnit.SECONDS);
}
Expand Down Expand Up @@ -503,7 +503,7 @@ public synchronized void setupBrokerPublishRateLimiterMonitor() {
brokerTickTimeMs,
brokerTickTimeMs,
TimeUnit.MILLISECONDS);
// schedule task that refreshes rate-limitting bucket
// schedule task that refreshes rate-limiting bucket
brokerPublishRateLimiterMonitor.scheduleAtFixedRate(
safeRun(() -> refreshBrokerPublishRate()),
1,
Expand Down Expand Up @@ -1871,7 +1871,7 @@ public ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpe
}

/**
* If per-broker unack message reached to limit then it blocks dispatcher if its unack message limit has been
* If per-broker unacked message reached to limit then it blocks dispatcher if its unacked message limit has been
* reached to {@link #maxUnackedMsgsPerDispatcher}
*
* @param dispatcher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ public synchronized void acknowledgeMessage(TxnID txnId, List<Position> position
pendingAckMessagesMap.computeIfAbsent(txnId, txn -> new ConcurrentOpenHashSet<>());

for (Position position : positions) {
// If try to ack message already acked by some ongoign transaction(can be itself), throw exception.
// If try to ack message already acked by some ongoing transaction(can be itself), throw exception.
// Acking single message within range of cumulative ack(if exist) is considered valid operation.
if (this.pendingAckMessages.contains(position)) {
String errorMsg = "[" + topicName + "][" + subName + "] Transaction:" + txnId +
Expand Down Expand Up @@ -883,7 +883,7 @@ public CompletableFuture<Void> delete() {
*
* @param consumer
* consumer object that is initiating the unsubscribe operation
* @return CompletableFuture indicating the completion of ubsubscribe operation
* @return CompletableFuture indicating the completion of unsubscribe operation
*/
@Override
public CompletableFuture<Void> doUnsubscribe(Consumer consumer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void checkCompatible(SchemaData from, SchemaData to, SchemaCompatibilityS
super.checkCompatible(from, to, strategy);
} else if (isJsonSchema(to)) {
// if broker have the schema in avro format but producer sent a schema in the old json format
// allow old schema format for backwards compatiblity
// allow old schema format for backwards compatibility
} else {
// unknown schema format
throw new IncompatibleSchemaException("Unknown schema format");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ public void testGetDynamicLocalConfiguration() throws Exception {
assertNotEquals(pulsar.getConfiguration().getBrokerShutdownTimeoutMs(), shutdownTime);
// update configuration
admin.brokers().updateDynamicConfiguration(configName, Long.toString(shutdownTime));
// Now, znode is created: updateConfigurationAndregisterListeners and check if configuration updated
// Now, znode is created: updateConfigurationAndRegisterListeners and check if configuration updated
assertEquals(Long.parseLong(admin.brokers().getAllDynamicConfigurations().get(configName)), shutdownTime);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ public void testResetCursorOnPosition(String namespaceName) throws Exception {
}
}

// close consumer which will clean up intenral-receive-queue
// close consumer which will clean up internal-receive-queue
consumer.close();

// messages should still be available due to retention
Expand Down Expand Up @@ -589,7 +589,7 @@ public void testPeerCluster() throws Exception {
assertTrue(e instanceof PreconditionFailedException);
}

// Cluster itselft can't be part of peer-list
// Cluster itself can't be part of peer-list
try {
admin.clusters().updatePeerClusterNames("us-west1", Sets.newLinkedHashSet(Lists.newArrayList("us-west1")));
fail("should have failed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ public void testBrokerSelectionForAntiAffinityGroup() throws Exception {

/**
* It verifies that load-shedding task should unload namespace only if there is a broker available which doesn't
* cause uneven anti-affinitiy namespace distribution.
* cause uneven anti-affinity namespace distribution.
*
* <pre>
* 1. broker1 owns ns-0 => broker1 can unload ns-0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ private void printSortedRanking(AtomicReference<Map<Long, Set<ResourceUnit>>> so
/*
* Pre-publish load report to ZK, each broker has: - Difference memory capacity, for the first 3 brokers memory is
* bottleneck, for the 4/5th brokers CPU become bottleneck since memory is big enough - non-bundles assigned so all
* idle resources are avaiable for new bundle Check the broker rankings are the load percentage of each broker.
* idle resources are available for new bundle Check the broker rankings are the load percentage of each broker.
*/
@Test
public void testBrokerRanking() throws Exception {
Expand Down Expand Up @@ -402,7 +402,7 @@ public void testTopicAssignmentWithExistingBundles() throws Exception {
printSortedRanking(sortedRanking);
}

// check owner of new destiations and verify that the distribution is roughly
// check owner of new destinations and verify that the distribution is roughly
// consistent (variation < 10%) with the broker capacity:
int totalNamespaces = 250;
int[] expectedAssignments = new int[] { 17, 34, 51, 68, 85 };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ public void testLoadReportDeserialize() throws Exception {
CreateMode.EPHEMERAL);
LookupResult result1 = pulsar.getNamespaceService().createLookupResult(candidateBroker1).get();

// update to new load mananger
// update to new load manager
LoadManager oldLoadManager = pulsar.getLoadManager()
.getAndSet(new ModularLoadManagerWrapper(new ModularLoadManagerImpl()));
oldLoadManager.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ protected void cleanup() throws Exception {
}

/**
* Verifies: updating zk-thottling node reflects broker-maxConcurrentLookupRequest and updates semaphore.
* Verifies: updating zk-throttling node reflects broker-maxConcurrentLookupRequest and updates semaphore.
*
* @throws Exception
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void testMultipleBrokerLookup() throws Exception {
doReturn(true).when(loadManager2).isCentralized();
loadManagerField.set(pulsar2.getNamespaceService(), new AtomicReference<>(loadManager2));

// mock: return Broker2 as a Least-loaded broker when leader receies request [3]
// mock: return Broker2 as a Least-loaded broker when leader receives request [3]
doReturn(true).when(loadManager1).isCentralized();
SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar2.getSafeWebServiceAddress(), null);
doReturn(Optional.of(resourceUnit)).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
Expand Down Expand Up @@ -206,7 +206,7 @@ public void testMultipleBrokerLookup() throws Exception {

/**
* Usecase: Redirection due to different cluster 1. Broker1 runs on cluster: "use" and Broker2 runs on cluster:
* "use2" 2. Broker1 receives "use2" cluster request => Broker1 reads "/clusters" from global-zookkeeper and
* "use2" 2. Broker1 receives "use2" cluster request => Broker1 reads "/clusters" from global-zookeeper and
* redirects request to Broker2 which serves "use2" 3. Broker2 receives redirect request and own namespace bundle
*
* @throws Exception
Expand Down Expand Up @@ -252,7 +252,7 @@ public void testMultipleBrokerDifferentClusterLookup() throws Exception {
Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager");
loadManagerField.setAccessible(true);

// mock: return Broker2 as a Least-loaded broker when leader receies request
// mock: return Broker2 as a Least-loaded broker when leader receives request
doReturn(true).when(loadManager2).isCentralized();
SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar2.getSafeWebServiceAddress(), null);
doReturn(Optional.of(resourceUnit)).when(loadManager2).getLeastLoaded(any(ServiceUnitId.class));
Expand Down Expand Up @@ -291,7 +291,7 @@ public void testMultipleBrokerDifferentClusterLookup() throws Exception {
}

/**
* Create #PartitionedTopic and let it served by multiple brokers which requries a. tcp partitioned-metadata-lookup
* Create #PartitionedTopic and let it served by multiple brokers which requires a. tcp partitioned-metadata-lookup
* b. multiple topic-lookup c. partitioned producer-consumer
*
* @throws Exception
Expand Down Expand Up @@ -324,7 +324,7 @@ public void testPartitionTopicLookup() throws Exception {
Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager");
loadManagerField.setAccessible(true);

// mock: return Broker2 as a Least-loaded broker when leader receies request
// mock: return Broker2 as a Least-loaded broker when leader receives request
doReturn(true).when(loadManager1).isCentralized();
loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1));

Expand Down Expand Up @@ -418,7 +418,7 @@ public void testWebserviceServiceTls() throws Exception {
loadManagerField.set(pulsar2.getNamespaceService(), new AtomicReference<>(loadManager2));
loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1));

// mock: return Broker2 as a Least-loaded broker when leader receies
// mock: return Broker2 as a Least-loaded broker when leader receives
// request [3]
doReturn(true).when(loadManager1).isCentralized();
doReturn(true).when(loadManager2).isCentralized();
Expand Down Expand Up @@ -844,7 +844,7 @@ public void testSplitUnloadLookupTest() throws Exception {
// mock: redirect request to leader [2]
doReturn(true).when(loadManager2).isCentralized();
loadManagerField.set(pulsar2.getNamespaceService(), new AtomicReference<>(loadManager2));
// mock: return Broker1 as a Least-loaded broker when leader receies request [3]
// mock: return Broker1 as a Least-loaded broker when leader receives request [3]
doReturn(true).when(loadManager1).isCentralized();
SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getSafeWebServiceAddress(), null);
doReturn(Optional.of(resourceUnit)).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
Expand Down Expand Up @@ -934,7 +934,7 @@ public void testModularLoadManagerSplitBundle() throws Exception {
@Cleanup
PulsarService pulsar2 = startBroker(conf2);

// configure broker-1 with ModularLoadlManager
// configure broker-1 with ModularLoadManager
stopBroker();
conf.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
startBroker();
Expand All @@ -951,7 +951,7 @@ public void testModularLoadManagerSplitBundle() throws Exception {
// mock: redirect request to leader [2]
doReturn(true).when(loadManager2).isCentralized();
loadManagerField.set(pulsar2.getNamespaceService(), new AtomicReference<>(loadManager2));
// mock: return Broker1 as a Least-loaded broker when leader receies request [3]
// mock: return Broker1 as a Least-loaded broker when leader receives request [3]
doReturn(true).when(loadManager1).isCentralized();
SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getSafeWebServiceAddress(), null);
Optional<ResourceUnit> res = Optional.of(resourceUnit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ public void testProducerReconnect() throws Exception {
channelCtx.set(ctx);
ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion()));
if (numOfConnections.incrementAndGet() == 2) {
// close the cnx immediately when trying to conenct the 2nd time
// close the cnx immediately when trying to connect the 2nd time
ctx.channel().close();
}
});
Expand Down Expand Up @@ -612,7 +612,7 @@ public void testConsumerReconnect() throws Exception {
channelCtx.set(ctx);
ctx.writeAndFlush(Commands.newConnected(connect.getProtocolVersion()));
if (numOfConnections.incrementAndGet() == 2) {
// close the cnx immediately when trying to conenct the 2nd time
// close the cnx immediately when trying to connect the 2nd time
ctx.channel().close();
}
if (numOfConnections.get() == 3) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ protected void cleanup() throws Exception {
* It verifies that redelivered messages are sorted based on the ledger-ids.
* <pre>
* 1. client publishes 100 messages across 50 ledgers
* 2. broker deliveres 100 messages to consumer
* 3. consumer ack every alternative message and doesn't ack 50 messsages
* 2. broker delivers 100 messages to consumer
* 3. consumer ack every alternative message and doesn't ack 50 messages
* 4. broker sorts replay messages based on ledger and redelivers messages ledger by ledger
* </pre>
* @throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public void testDisconnectClientWithoutClosingConnection() throws Exception {

// unload ns-bundle2 as well
pulsar.getNamespaceService().unloadNamespaceBundle((NamespaceBundle) bundle2);
// let producer2 give some time to get disconnect signal and get disconencted
// let producer2 give some time to get disconnect signal and get disconnected
Thread.sleep(200);
verify(producer2, atLeastOnce()).connectionClosed(any());

Expand Down

0 comments on commit 648ed0e

Please sign in to comment.