Skip to content

Commit

Permalink
Use for-each instead of the for-loop on pulsar collections (apache#15388
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Demogorgon314 authored May 2, 2022
1 parent 72bbb97 commit bd012ad
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.mutable.MutableInt;

/**
* A Concurrent set comprising zero or more ranges of type {@link LongPair}. This can be alternative of
Expand Down Expand Up @@ -245,12 +245,12 @@ public Range<T> lastRange() {
@Override
public int size() {
if (updatedAfterCachedForSize) {
AtomicInteger size = new AtomicInteger(0);
MutableInt size = new MutableInt(0);
forEach((range) -> {
size.getAndIncrement();
size.increment();
return true;
});
cachedSize = size.get();
cachedSize = size.intValue();
updatedAfterCachedForSize = false;
}
return cachedSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
*/
package org.apache.pulsar.common.util.collections;

import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet.LongPair;
import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet.LongPairConsumer;

Expand Down Expand Up @@ -112,16 +113,14 @@ public boolean remove(long item1, long item2) {

@Override
public int removeIf(LongPairPredicate filter) {
int removedValues = 0;
for (Map.Entry<Long, ConcurrentLongPairSet> entry : longPairSets.entrySet()) {
Long item1 = entry.getKey();
ConcurrentLongPairSet longPairSet = entry.getValue();
removedValues += longPairSet.removeIf(filter);
MutableInt removedValues = new MutableInt(0);
longPairSets.forEach((item1, longPairSet) -> {
removedValues.add(longPairSet.removeIf(filter));
if (longPairSet.isEmpty() && longPairSets.size() > maxAllowedSetOnRemove) {
longPairSets.remove(item1, longPairSet);
}
}
return removedValues;
});
return removedValues.intValue();
}

@Override
Expand All @@ -131,12 +130,7 @@ public Set<LongPair> items() {

@Override
public void forEach(LongPairConsumer processor) {
for (Long item1 : longPairSets.navigableKeySet()) {
ConcurrentLongPairSet messagesToReplay = longPairSets.get(item1);
messagesToReplay.forEach((i1, i2) -> {
processor.accept(i1, i2);
});
}
longPairSets.forEach((__, longPairSet) -> longPairSet.forEach(processor));
}

@Override
Expand All @@ -147,15 +141,12 @@ public Set<LongPair> items(int numberOfItems) {
@Override
public <T> Set<T> items(int numberOfItems, LongPairFunction<T> longPairConverter) {
NavigableSet<T> items = new TreeSet<>();
for (Long item1 : longPairSets.navigableKeySet()) {
ConcurrentLongPairSet messagesToReplay = longPairSets.get(item1);
messagesToReplay.forEach((i1, i2) -> {
items.add(longPairConverter.apply(i1, i2));
if (items.size() > numberOfItems) {
items.pollLast();
}
});
}
forEach((i1, i2) -> {
items.add(longPairConverter.apply(i1, i2));
if (items.size() > numberOfItems) {
items.pollLast();
}
});
return items;
}

Expand Down Expand Up @@ -200,20 +191,16 @@ public void clear() {

@Override
public long size() {
long size = 0;
for (Map.Entry<Long, ConcurrentLongPairSet> entry : longPairSets.entrySet()) {
size += entry.getValue().size();
}
return size;
MutableLong size = new MutableLong(0);
longPairSets.forEach((__, longPairSet) -> size.add(longPairSet.size()));
return size.longValue();
}

@Override
public long capacity() {
long capacity = 0;
for (Map.Entry<Long, ConcurrentLongPairSet> entry : longPairSets.entrySet()) {
capacity += entry.getValue().capacity();
}
return capacity;
MutableLong capacity = new MutableLong(0);
longPairSets.forEach((__, longPairSet) -> capacity.add(longPairSet.capacity()));
return capacity.longValue();
}

@Override
Expand Down

0 comments on commit bd012ad

Please sign in to comment.