Skip to content

Commit

Permalink
[Issue 9496] fix logic in ManagedLedgerWriter when config threadNum >…
Browse files Browse the repository at this point in the history
…= ledgerNum (apache#9497)

Fix apache#9496 

fix logic in ManagedLedgerWriter when config threadNum >= ledgerNum

### Modifications

if threadNum >= ledgerNum.

allocate ledger repeat among threads.

origin logic may have thread without ledger and got an exception.

### Verifying this change

build and run 
`./pulsar-perf managed-ledger -e 3 -w 2 -o 10000 --threads 20 -r 100000 -s 2048 -zk localhost:2181`

no exception in stdlog

### Documentation

  - Does this pull request introduce a new feature?   no
  - If yes, how is the feature documented? (not applicable)
  • Loading branch information
lifepuzzlefun authored Feb 8, 2021
1 parent 16a2240 commit 07f3509
Showing 1 changed file with 38 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@
import io.netty.util.concurrent.DefaultThreadFactory;

import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -213,8 +210,7 @@ public void run() {
Collections.shuffle(managedLedgers);
AtomicBoolean isDone = new AtomicBoolean();

List<List<ManagedLedger>> managedLedgersPerThread = Lists.partition(managedLedgers,
Math.max(1, managedLedgers.size() / arguments.numThreads));
Map<Integer, List<ManagedLedger>> managedLedgersPerThread = allocateToThreads(managedLedgers, arguments.numThreads);

for (int i = 0; i < arguments.numThreads; i++) {
List<ManagedLedger> managedLedgersForThisThread = managedLedgersPerThread.get(i);
Expand Down Expand Up @@ -334,6 +330,42 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
factory.shutdown();
}


public static <T> Map<Integer,List<T>> allocateToThreads(List<T> managedLedgers, int numThreads) {

Map<Integer,List<T>> map = new HashMap<>();

if (managedLedgers.size() >= numThreads) {
int threadIndex = 0;
for (T managedLedger : managedLedgers) {

List<T> ledgerList = map.getOrDefault(threadIndex, new ArrayList<>());
ledgerList.add(managedLedger);
map.put(threadIndex, ledgerList);

threadIndex++;
if (threadIndex >= numThreads) {
threadIndex = threadIndex % numThreads;
}
}

} else {
int ledgerIndex = 0;
for(int threadIndex = 0;threadIndex<numThreads;threadIndex++) {
List<T> ledgerList = map.getOrDefault(threadIndex,new ArrayList<>());
ledgerList.add(managedLedgers.get(ledgerIndex));
map.put(threadIndex,ledgerList);

ledgerIndex++;
if(ledgerIndex >= managedLedgers.size()) {
ledgerIndex = ledgerIndex % managedLedgers.size();
}
}
}

return map;
}

private static void printAggregatedStats() {
Histogram reportHistogram = cumulativeRecorder.getIntervalHistogram();

Expand Down

0 comments on commit 07f3509

Please sign in to comment.