Skip to content

Commit

Permalink
GEODE-8765: Fix NullPointerException when group-transaction-events an… (
Browse files Browse the repository at this point in the history
apache#5829)

* GEODE-8765: Fix NullPointerException when group-transaction-events and events in and not in transactions are sent.
  • Loading branch information
albertogpz authored Dec 14, 2020
1 parent b73d30e commit 4c00984
Show file tree
Hide file tree
Showing 5 changed files with 351 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,6 @@ public List peek(int batchSize, int timeToWait) throws InterruptedException, Cac
addPeekedEvents(batch, batchSize == BATCH_BASED_ON_TIME_ONLY ? DEFAULT_BATCH_SIZE : batchSize);

int bId;
Map<TransactionId, Integer> incompleteTransactionsInBatch = new HashMap<>();
while (batchSize == BATCH_BASED_ON_TIME_ONLY || batch.size() < batchSize) {
if (areLocalBucketQueueRegionsPresent() && ((bId = getRandomPrimaryBucket(prQ)) != -1)) {
GatewaySenderEventImpl object = (GatewaySenderEventImpl) peekAhead(prQ, bId);
Expand All @@ -1280,13 +1279,6 @@ public List peek(int batchSize, int timeToWait) throws InterruptedException, Cac
logger.debug("The gatewayEventImpl in peek is {}", object);
}
batch.add(object);
if (object.getTransactionId() != null) {
if (object.isLastEventInTransaction()) {
incompleteTransactionsInBatch.remove(object.getTransactionId());
} else {
incompleteTransactionsInBatch.put(object.getTransactionId(), bId);
}
}
peekedEvents.add(object);

} else {
Expand Down Expand Up @@ -1316,7 +1308,7 @@ public List peek(int batchSize, int timeToWait) throws InterruptedException, Cac
}

if (batch.size() > 0) {
peekEventsFromIncompleteTransactions(batch, incompleteTransactionsInBatch, prQ);
peekEventsFromIncompleteTransactions(batch, prQ);
}

if (isDebugEnabled) {
Expand Down Expand Up @@ -1351,12 +1343,14 @@ protected boolean mustGroupTransactionEvents() {
}

private void peekEventsFromIncompleteTransactions(List<GatewaySenderEventImpl> batch,
Map<TransactionId, Integer> incompleteTransactionIdsInBatch, PartitionedRegion prQ) {
PartitionedRegion prQ) {
if (!mustGroupTransactionEvents()) {
return;
}

if (areAllTransactionsCompleteInBatch(incompleteTransactionIdsInBatch)) {
Map<TransactionId, Integer> incompleteTransactionIdsInBatch =
getIncompleteTransactionsInBatch(batch);
if (incompleteTransactionIdsInBatch.size() == 0) {
return;
}

Expand Down Expand Up @@ -1389,8 +1383,19 @@ private void peekEventsFromIncompleteTransactions(List<GatewaySenderEventImpl> b
}
}

private boolean areAllTransactionsCompleteInBatch(Map incompleteTransactions) {
return (incompleteTransactions.size() == 0);
private Map<TransactionId, Integer> getIncompleteTransactionsInBatch(
List<GatewaySenderEventImpl> batch) {
Map<TransactionId, Integer> incompleteTransactionsInBatch = new HashMap<>();
for (GatewaySenderEventImpl event : batch) {
if (event.getTransactionId() != null) {
if (event.isLastEventInTransaction()) {
incompleteTransactionsInBatch.remove(event.getTransactionId());
} else {
incompleteTransactionsInBatch.put(event.getTransactionId(), event.getBucketId());
}
}
}
return incompleteTransactionsInBatch;
}

@VisibleForTesting
Expand Down Expand Up @@ -1472,19 +1477,18 @@ private void addPreviouslyPeekedEvents(List<GatewaySenderEventImpl> batch, int b
for (int i = 0; i < batchSize || incompleteTransactionsInBatch.size() != 0; i++) {
GatewaySenderEventImpl event = this.peekedEventsProcessing.remove();
batch.add(event);
if (event.getTransactionId() != null) {
if (event.isLastEventInTransaction()) {
incompleteTransactionsInBatch.remove(event.getTransactionId());
} else {
incompleteTransactionsInBatch.add(event.getTransactionId());
if (mustGroupTransactionEvents()) {
if (event.getTransactionId() != null) {
if (event.isLastEventInTransaction()) {
incompleteTransactionsInBatch.remove(event.getTransactionId());
} else {
incompleteTransactionsInBatch.add(event.getTransactionId());
}
}
}
if (this.peekedEventsProcessing.isEmpty()) {
this.resetLastPeeked = false;
this.peekedEventsProcessingInProgress = false;
if (incompleteTransactionsInBatch.size() != 0) {
logger.error("A batch with incomplete transactions has been sent.");
}
break;
}
}
Expand Down Expand Up @@ -1547,9 +1551,9 @@ protected List<Object> peekEventsWithTransactionId(PartitionedRegion prQ, int bu

try {
Predicate<GatewaySenderEventImpl> hasTransactionIdPredicate =
x -> x.getTransactionId().equals(transactionId);
getHasTransactionIdPredicate(transactionId);
Predicate<GatewaySenderEventImpl> isLastEventInTransactionPredicate =
x -> x.isLastEventInTransaction();
getIsLastEventInTransactionPredicate();
objects =
brq.getElementsMatching(hasTransactionIdPredicate, isLastEventInTransactionPredicate);
} catch (BucketRegionQueueUnavailableException e) {
Expand All @@ -1561,6 +1565,16 @@ protected List<Object> peekEventsWithTransactionId(PartitionedRegion prQ, int bu
// finished with peeked objects.
}

@VisibleForTesting
public static Predicate<GatewaySenderEventImpl> getIsLastEventInTransactionPredicate() {
return x -> x.isLastEventInTransaction();
}

@VisibleForTesting
public static Predicate<GatewaySenderEventImpl> getHasTransactionIdPredicate(
TransactionId transactionId) {
return x -> transactionId.equals(x.getTransactionId());
}

protected BucketRegionQueue getBucketRegionQueueByBucketId(final PartitionedRegion prQ,
final int bucketId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,12 +413,12 @@ public Object peek() throws CacheException {
}

@Override
public List<AsyncEvent> peek(int size) throws CacheException {
public List<AsyncEvent<?, ?>> peek(int size) throws CacheException {
return peek(size, -1);
}

@Override
public List<AsyncEvent> peek(int size, int timeToWait) throws CacheException {
public List<AsyncEvent<?, ?>> peek(int size, int timeToWait) throws CacheException {
final boolean isTraceEnabled = logger.isTraceEnabled();

long start = System.currentTimeMillis();
Expand All @@ -428,27 +428,16 @@ public List<AsyncEvent> peek(int size, int timeToWait) throws CacheException {
timeToWait);
}

List<AsyncEvent> batch =
new ArrayList<AsyncEvent>(size == BATCH_BASED_ON_TIME_ONLY ? DEFAULT_BATCH_SIZE : size);
Set<TransactionId> incompleteTransactionsInBatch = new HashSet<>();
List<AsyncEvent<?, ?>> batch =
new ArrayList<>(size == BATCH_BASED_ON_TIME_ONLY ? DEFAULT_BATCH_SIZE : size);
long lastKey = -1;
while (size == BATCH_BASED_ON_TIME_ONLY || batch.size() < size) {
KeyAndEventPair pair = peekAhead();
// Conflate here
if (pair != null) {
AsyncEvent object = pair.event;
AsyncEvent<?, ?> object = pair.event;
lastKey = pair.key;
batch.add(object);
if (object instanceof GatewaySenderEventImpl) {
GatewaySenderEventImpl event = (GatewaySenderEventImpl) object;
if (event.getTransactionId() != null) {
if (event.isLastEventInTransaction()) {
incompleteTransactionsInBatch.remove(event.getTransactionId());
} else {
incompleteTransactionsInBatch.add(event.getTransactionId());
}
}
}
} else {
// If time to wait is -1 (don't wait) or time interval has elapsed
long currentTime = System.currentTimeMillis();
Expand Down Expand Up @@ -476,7 +465,7 @@ public List<AsyncEvent> peek(int size, int timeToWait) throws CacheException {
}
}
if (batch.size() > 0) {
peekEventsFromIncompleteTransactions(batch, incompleteTransactionsInBatch, lastKey);
peekEventsFromIncompleteTransactions(batch, lastKey);
}

if (isTraceEnabled) {
Expand All @@ -487,13 +476,13 @@ public List<AsyncEvent> peek(int size, int timeToWait) throws CacheException {
// so no need to worry about off-heap refCount.
}

private void peekEventsFromIncompleteTransactions(List<AsyncEvent> batch,
Set<TransactionId> incompleteTransactionIdsInBatch, long lastKey) {
private void peekEventsFromIncompleteTransactions(List<AsyncEvent<?, ?>> batch, long lastKey) {
if (!mustGroupTransactionEvents()) {
return;
}

if (areAllTransactionsCompleteInBatch(incompleteTransactionIdsInBatch)) {
Set<TransactionId> incompleteTransactionIdsInBatch = getIncompleteTransactionsInBatch(batch);
if (incompleteTransactionIdsInBatch.size() == 0) {
return;
}

Expand Down Expand Up @@ -530,8 +519,21 @@ protected boolean mustGroupTransactionEvents() {
return sender.mustGroupTransactionEvents();
}

private boolean areAllTransactionsCompleteInBatch(Set incompleteTransactions) {
return (incompleteTransactions.size() == 0);
private Set<TransactionId> getIncompleteTransactionsInBatch(List<AsyncEvent<?, ?>> batch) {
Set<TransactionId> incompleteTransactionsInBatch = new HashSet<>();
for (Object object : batch) {
if (object instanceof GatewaySenderEventImpl) {
GatewaySenderEventImpl event = (GatewaySenderEventImpl) object;
if (event.getTransactionId() != null) {
if (event.isLastEventInTransaction()) {
incompleteTransactionsInBatch.remove(event.getTransactionId());
} else {
incompleteTransactionsInBatch.add(event.getTransactionId());
}
}
}
}
return incompleteTransactionsInBatch;
}

@Override
Expand All @@ -557,9 +559,9 @@ public void addCacheListener(CacheListener listener) {
public void removeCacheListener() {
AttributesMutator mutator = this.region.getAttributesMutator();
CacheListener[] listeners = this.region.getAttributes().getCacheListeners();
for (int i = 0; i < listeners.length; i++) {
if (listeners[i] instanceof SerialSecondaryGatewayListener) {
mutator.removeCacheListener(listeners[i]);
for (CacheListener listener : listeners) {
if (listener instanceof SerialSecondaryGatewayListener) {
mutator.removeCacheListener(listener);
break;
}
}
Expand Down Expand Up @@ -591,7 +593,7 @@ private boolean removeOldEntry(Conflatable object, Long tailKey) throws CacheExc
try {
Map<Object, Long> latestIndexesForRegion = this.indexes.get(rName);
if (latestIndexesForRegion == null) {
latestIndexesForRegion = new HashMap<Object, Long>();
latestIndexesForRegion = new HashMap<>();
this.indexes.put(rName, latestIndexesForRegion);
}

Expand Down Expand Up @@ -664,7 +666,7 @@ private AsyncEvent optimalGet(Long k) {
Object o = null;
try {
o = lr.getValueInVMOrDiskWithoutFaultIn(k);
if (o != null && o instanceof CachedDeserializable) {
if (o instanceof CachedDeserializable) {
o = ((CachedDeserializable) o).getDeserializedValue(lr, lr.getRegionEntry(k));
}
} catch (EntryNotFoundException ok) {
Expand Down Expand Up @@ -842,7 +844,7 @@ public KeyAndEventPair peekAhead() throws CacheException {

private EventsAndLastKey peekEventsWithTransactionId(TransactionId transactionId, long lastKey) {
Predicate<GatewaySenderEventImpl> hasTransactionIdPredicate =
x -> x.getTransactionId().equals(transactionId);
x -> transactionId.equals(x.getTransactionId());
Predicate<GatewaySenderEventImpl> isLastEventInTransactionPredicate =
x -> x.isLastEventInTransaction();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.cache.wan.GatewaySenderStats;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderHelper;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
import org.apache.geode.internal.statistics.DummyStatisticsFactory;
import org.apache.geode.test.fake.Fakes;

Expand Down Expand Up @@ -138,38 +138,39 @@ public void testBasicDestroyConflationEnabledAndValueNotInRegion() {
}

@Test
public void testGetElementsMatchingWithHasTransactionIdPredicateAndIsLastEventInTransactionPredicate()
public void testGetElementsMatchingWithParallelGatewaySenderQueuePredicatesAndSomeEventsNotInTransactions()
throws ForceReattemptException {
ParallelGatewaySenderEventProcessor processor =
ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);
ParallelGatewaySenderHelper.createParallelGatewaySenderEventProcessor(this.sender);

TransactionId tx1 = new TXId(null, 1);
TransactionId tx2 = new TXId(null, 2);
TransactionId tx3 = new TXId(null, 3);

GatewaySenderEventImpl event1 = createMockGatewaySenderEvent(1, tx1, false);
GatewaySenderEventImpl event2 = createMockGatewaySenderEvent(2, tx2, false);
GatewaySenderEventImpl event3 = createMockGatewaySenderEvent(3, tx1, true);
GatewaySenderEventImpl event4 = createMockGatewaySenderEvent(4, tx2, true);
GatewaySenderEventImpl event5 = createMockGatewaySenderEvent(5, tx3, false);
GatewaySenderEventImpl event6 = createMockGatewaySenderEvent(6, tx3, false);
GatewaySenderEventImpl event7 = createMockGatewaySenderEvent(7, tx1, true);
GatewaySenderEventImpl eventNotInTransaction1 = createMockGatewaySenderEvent(2, null, false);
GatewaySenderEventImpl event2 = createMockGatewaySenderEvent(3, tx2, false);
GatewaySenderEventImpl event3 = createMockGatewaySenderEvent(4, tx1, true);
GatewaySenderEventImpl event4 = createMockGatewaySenderEvent(5, tx2, true);
GatewaySenderEventImpl event5 = createMockGatewaySenderEvent(6, tx3, false);
GatewaySenderEventImpl event6 = createMockGatewaySenderEvent(7, tx3, false);
GatewaySenderEventImpl event7 = createMockGatewaySenderEvent(8, tx1, true);

this.bucketRegionQueue
.cleanUpDestroyedTokensAndMarkGIIComplete(InitialImageOperation.GIIStatus.NO_GII);

this.bucketRegionQueue.addToQueue(Long.valueOf(1), event1);
this.bucketRegionQueue.addToQueue(Long.valueOf(2), event2);
this.bucketRegionQueue.addToQueue(Long.valueOf(3), event3);
this.bucketRegionQueue.addToQueue(Long.valueOf(4), event4);
this.bucketRegionQueue.addToQueue(Long.valueOf(5), event5);
this.bucketRegionQueue.addToQueue(Long.valueOf(6), event6);
this.bucketRegionQueue.addToQueue(Long.valueOf(7), event7);
this.bucketRegionQueue.addToQueue(1L, event1);
this.bucketRegionQueue.addToQueue(2L, eventNotInTransaction1);
this.bucketRegionQueue.addToQueue(3L, event2);
this.bucketRegionQueue.addToQueue(4L, event3);
this.bucketRegionQueue.addToQueue(5L, event4);
this.bucketRegionQueue.addToQueue(6L, event5);
this.bucketRegionQueue.addToQueue(7L, event6);
this.bucketRegionQueue.addToQueue(8L, event7);

Predicate<GatewaySenderEventImpl> hasTransactionIdPredicate =
x -> x.getTransactionId().equals(tx1);
ParallelGatewaySenderQueue.getHasTransactionIdPredicate(tx1);
Predicate<GatewaySenderEventImpl> isLastEventInTransactionPredicate =
x -> x.isLastEventInTransaction();
ParallelGatewaySenderQueue.getIsLastEventInTransactionPredicate();
List<Object> objects = this.bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate,
isLastEventInTransactionPredicate);

Expand All @@ -182,7 +183,7 @@ public void testGetElementsMatchingWithHasTransactionIdPredicateAndIsLastEventIn
assertEquals(objects, Arrays.asList(new Object[] {event7}));

hasTransactionIdPredicate =
x -> x.getTransactionId().equals(tx2);
ParallelGatewaySenderQueue.getHasTransactionIdPredicate(tx2);
objects = this.bucketRegionQueue.getElementsMatching(hasTransactionIdPredicate,
isLastEventInTransactionPredicate);
assertEquals(2, objects.size());
Expand Down
Loading

0 comments on commit 4c00984

Please sign in to comment.