Skip to content

Commit

Permalink
GEODE-9248: Server hosting CQ queue uneccessary fills bucketToTempQue…
Browse files Browse the repository at this point in the history
…ue (apache#6646)


Issue reproduces when following conditions are fulfilled:

 - Redundant partition region must be configured
 - Number of servers must be greater than number of redundant copies of partition region
 - Parallel gateway sender must be configured on partition region
 - Client must register CQs for the region
 - Transactions must be used with put operations
 - Events must be enqueued in parallel gateway senders (remote site is unavailable)

Server that is hosting primary bucket will send TXCommitMessage to the secondary
server, and also to the server that is hosting CQ subscription queue (if CQ condition is fulfilled).
The problem occurs when the server that is hosting CQ subscription queue does not host
the bucket for which event is received. In that case the server will store the event in
bucketToTempQueueMap, because it assumes that the bucket is in the process of the
creation, which is not correct.

The solution:

At reception of CommitProcessForTXIdMessage the events received with TXCommitMessage will not
be stored in temporary queue, if targeted bucket region is not hosted in the server. Following
checks are performed on receiving server in order to identify those events:

1. Check that targeted bucket region in not available locally on server
2. Check from filterRoutingInfo that the receiving server hosts CQ/RegionInterests queue for that event

If above checks are fulfilled then tailKey is set to -1 on receiving server to avoid enqueuing of
event in temporary queue.

Co-authored-by: Jakov Varenina <[email protected]>
  • Loading branch information
jvarenina authored Jul 7, 2021
1 parent 7b3693d commit b12b8f6
Show file tree
Hide file tree
Showing 4 changed files with 518 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1242,14 +1242,21 @@ protected void txApplyEntryOp(FarSideEntryOp entryOp, List<EntryEventImpl> pendi
/*
* This happens when we don't have the bucket and are getting adjunct notification
*/
long tailKey = entryOp.tailKey;
if (entryOp.filterRoutingInfo != null) {
if (entryOp.filterRoutingInfo.getMembers().contains(this.internalRegion.getMyId())) {
tailKey = -1;
}
}

// No need to release because it is added to pendingCallbacks and they will be released
// later
EntryEventImpl eei =
txCallbackEventFactory.createCallbackEvent(internalRegion, entryOp.op,
entryOp.key,
entryOp.value, msg.txIdent, txEvent, getEventId(entryOp), entryOp.callbackArg,
entryOp.filterRoutingInfo, msg.bridgeContext, null, entryOp.versionTag,
entryOp.tailKey);
entryOp.filterRoutingInfo, msg.bridgeContext, null, entryOp.versionTag, tailKey);

if (entryOp.filterRoutingInfo != null) {
eei.setLocalFilterInfo(
entryOp.filterRoutingInfo.getFilterInfo(internalRegion.getCache().getMyId()));
Expand Down
1 change: 1 addition & 0 deletions geode-dunit/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies {
implementation(project(':geode-tcp-server'))
implementation(project(':geode-core'))
implementation(project(':geode-gfsh'))
implementation(project(':geode-cq'))
implementation(project(':geode-log4j')) {
exclude module: 'geode-core'
}
Expand Down
5 changes: 5 additions & 0 deletions geode-dunit/src/test/resources/expected-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@
<artifactId>geode-gfsh</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.geode</groupId>
<artifactId>geode-cq</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.geode</groupId>
<artifactId>geode-log4j</artifactId>
Expand Down
Loading

0 comments on commit b12b8f6

Please sign in to comment.