Skip to content

Commit

Permalink
GEODE-8783: Publish batchesWithIncompleteTransactions in GatewaySende… (
Browse files Browse the repository at this point in the history
apache#5845)

* GEODE-8783: Publish batchesWithIncompleteTransactions in GatewaySenderMXBean
  • Loading branch information
albertogpz authored Jan 12, 2021
1 parent 0e15125 commit c215645
Show file tree
Hide file tree
Showing 15 changed files with 245 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public void init() {

@Test
public void testSenderStats() throws InterruptedException {
senderStats.incBatchesWithIncompleteTransactions();
senderStats.incBatchesWithIncompleteTransactions();
senderStats.incBatchesWithIncompleteTransactions();
senderStats.incBatchesRedistributed();
senderStats.incBatchesRedistributed();
senderStats.incEventsReceived();
Mockito.when(sender.getEventQueueSize()).thenReturn(10);
Expand All @@ -61,7 +65,9 @@ public void testSenderStats() throws InterruptedException {

sample();

assertEquals(1, getTotalBatchesRedistributed());
assertEquals(1, getTotalBatchesDistributed());
assertEquals(2, getTotalBatchesRedistributed());
assertEquals(3, getTotalBatchesWithIncompleteTransactions());
assertEquals(1, getTotalEventsConflated());
assertEquals(10, getEventQueueSize());
assertTrue(getEventsQueuedRate() > 0);
Expand All @@ -71,10 +77,18 @@ public void testSenderStats() throws InterruptedException {
assertTrue(getEventsExceedingAlertThreshold() > 0);
}

private int getTotalBatchesDistributed() {
return bridge.getTotalBatchesDistributed();
}

private int getTotalBatchesRedistributed() {
return bridge.getTotalBatchesRedistributed();
}

private int getTotalBatchesWithIncompleteTransactions() {
return bridge.getTotalBatchesWithIncompleteTransactions();
}

private int getTotalEventsConflated() {
return bridge.getTotalEventsConflated();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ public class GatewaySenderStats {
protected static final String BATCHES_DISTRIBUTED = "batchesDistributed";
/** Name of the batches redistributed statistic */
protected static final String BATCHES_REDISTRIBUTED = "batchesRedistributed";
/** Name of the batches redistributed statistic */
protected static final String BATCHES_WITH_INCOMPLETE_TRANSACTIONS =
"batchesWithIncompleteTransactions";
/** Name of the batches resized statistic */
protected static final String BATCHES_RESIZED = "batchesResized";
/** Name of the unprocessed events added by primary statistic */
Expand Down Expand Up @@ -125,6 +128,8 @@ public class GatewaySenderStats {
private static final int batchesDistributedId;
/** Id of the batches redistributed statistic */
private static final int batchesRedistributedId;
/** Id of the batches with incomplete transactions statistic */
private static final int batchesWithIncompleteTransactionsId;
/** Id of the batches resized statistic */
private static final int batchesResizedId;
/** Id of the unprocessed events added by primary statistic */
Expand Down Expand Up @@ -184,6 +189,7 @@ public class GatewaySenderStats {
batchDistributionTimeId = type.nameToId(BATCH_DISTRIBUTION_TIME);
batchesDistributedId = type.nameToId(BATCHES_DISTRIBUTED);
batchesRedistributedId = type.nameToId(BATCHES_REDISTRIBUTED);
batchesWithIncompleteTransactionsId = type.nameToId(BATCHES_WITH_INCOMPLETE_TRANSACTIONS);
batchesResizedId = type.nameToId(BATCHES_RESIZED);
unprocessedTokensAddedByPrimaryId = type.nameToId(UNPROCESSED_TOKENS_ADDED_BY_PRIMARY);
unprocessedEventsAddedBySecondaryId = type.nameToId(UNPROCESSED_EVENTS_ADDED_BY_SECONDARY);
Expand Down Expand Up @@ -240,6 +246,9 @@ protected static StatisticsType createType(final StatisticsTypeFactory f, final
f.createIntCounter(BATCHES_REDISTRIBUTED,
"Number of batches of events removed from the event queue and resent.",
"operations", false),
f.createLongCounter(BATCHES_WITH_INCOMPLETE_TRANSACTIONS,
"Number of batches of events sent with incomplete transactions.",
"operations", false),
f.createIntCounter(BATCHES_RESIZED,
"Number of batches that were resized because they were too large", "operations",
false),
Expand Down Expand Up @@ -461,27 +470,36 @@ public long getBatchDistributionTime() {
}

/**
* Returns the current value of the batchesDistributed" stat.
* Returns the current value of the "batchesDistributed" stat.
*
* @return the current value of the batchesDistributed" stat
* @return the current value of the "batchesDistributed" stat
*/
public int getBatchesDistributed() {
return this.stats.getInt(batchesDistributedId);
}

/**
* Returns the current value of the batchesRedistributed" stat.
* Returns the current value of the "batchesRedistributed" stat.
*
* @return the current value of the batchesRedistributed" stat
* @return the current value of the "batchesRedistributed" stat
*/
public int getBatchesRedistributed() {
return this.stats.getInt(batchesRedistributedId);
}

/**
* Returns the current value of the batchesResized" stat.
* Returns the current value of the "batchesWithIncompleteTransactions" stat.
*
* @return the current value of the batchesResized" stat
* @return the current value of the "batchesWithIncompleteTransactions" stat
*/
public long getBatchesWithIncompleteTransactions() {
return this.stats.getLong(batchesWithIncompleteTransactionsId);
}

/**
* Returns the current value of the "batchesResized" stat.
*
* @return the current value of the "batchesResized" stat
*/
public int getBatchesResized() {
return this.stats.getInt(batchesResizedId);
Expand All @@ -494,6 +512,13 @@ public void incBatchesRedistributed() {
this.stats.incInt(batchesRedistributedId, 1);
}

/**
* Increments the value of the "batchesWithIncompleteTransactions" stat by 1.
*/
public void incBatchesWithIncompleteTransactions() {
this.stats.incLong(batchesWithIncompleteTransactionsId, 1);
}

/**
* Increments the value of the "batchesRedistributed" stat by 1.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1311,6 +1311,7 @@ public List peek(int batchSize, int timeToWait) throws InterruptedException, Cac
peekEventsFromIncompleteTransactions(batch, prQ);
}


if (isDebugEnabled) {
logger.debug("{}: Peeked a batch of {} entries. The size of the queue is {}. localSize is {}",
this, batch.size(), size(), localSize());
Expand Down Expand Up @@ -1354,6 +1355,7 @@ private void peekEventsFromIncompleteTransactions(List<GatewaySenderEventImpl> b
return;
}

boolean batchHasIncompleteTransactions = false;
for (Map.Entry<TransactionId, Integer> pendingTransaction : incompleteTransactionIdsInBatch
.entrySet()) {
TransactionId transactionId = pendingTransaction.getKey();
Expand All @@ -1377,10 +1379,14 @@ private void peekEventsFromIncompleteTransactions(List<GatewaySenderEventImpl> b
}
}
if (!areAllEventsForTransactionInBatch) {
batchHasIncompleteTransactions = true;
logger.warn("Not able to retrieve all events for transaction {} after {} tries",
transactionId, retries);
}
}
if (batchHasIncompleteTransactions) {
stats.incBatchesWithIncompleteTransactions();
}
}

private Map<TransactionId, Integer> getIncompleteTransactionsInBatch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ private void peekEventsFromIncompleteTransactions(List<AsyncEvent<?, ?>> batch,
return;
}

boolean batchHasIncompleteTransactions = true;
for (TransactionId transactionId : incompleteTransactionIdsInBatch) {
boolean areAllEventsForTransactionInBatch = false;
int retries = 0;
Expand All @@ -492,10 +493,14 @@ private void peekEventsFromIncompleteTransactions(List<AsyncEvent<?, ?>> batch,
lastKeyForTransaction = eventsAndKey.lastKey;
}
if (!areAllEventsForTransactionInBatch) {
batchHasIncompleteTransactions = true;
logger.warn("Not able to retrieve all events for transaction {} after {} tries",
transactionId, retries);
}
}
if (batchHasIncompleteTransactions) {
stats.incBatchesWithIncompleteTransactions();
}
}

protected boolean mustGroupTransactionEvents() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,22 @@ public interface GatewaySenderMXBean {
*/
long getAverageDistributionTimePerBatch();

/**
* Returns the total number of batches of events that were resent.
*/
int getTotalBatchesDistributed();

/**
* Returns the total number of batches of events that were resent.
*/
int getTotalBatchesRedistributed();

/**
* Returns the total number of batches sent with incomplete transactions.
* Only relevant if group-transaction-events is enabled.
*/
int getTotalBatchesWithIncompleteTransactions();

/**
* Returns the total number of bytes in heap occupied by the event queue.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1274,13 +1274,27 @@ public float getGatewaySenderEventsQueuedRate() {
return senderMonitor.getGatewaySenderEventsQueuedRate();
}

/**
* @return total batches distributed
*/
public int getGatewaySenderTotalBatchesDistributed() {
return senderMonitor.getGatewaySenderTotalBatchesDistributed();
}

/**
* @return total batches redistributed
*/
public int getGatewaySenderTotalBatchesRedistributed() {
return senderMonitor.getGatewaySenderTotalBatchesRedistributed();
}

/**
* @return total batches with incomplete transactions
*/
public int getGatewaySenderTotalBatchesWithIncompleteTransactions() {
return senderMonitor.getGatewaySenderTotalBatchesWithIncompleteTransactions();
}

/**
* @return total number of events conflated
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,21 @@ public long getSocketReadTimeout() {
return bridge.getSocketReadTimeout();
}

@Override
public int getTotalBatchesDistributed() {
return bridge.getTotalBatchesDistributed();
}

@Override
public int getTotalBatchesRedistributed() {
return bridge.getTotalBatchesRedistributed();
}

@Override
public int getTotalBatchesWithIncompleteTransactions() {
return bridge.getTotalBatchesWithIncompleteTransactions();
}

@Override
public int getTotalEventsConflated() {
return bridge.getTotalEventsConflated();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,19 @@ public boolean mustGroupTransactionEvents() {

/** Statistics Related Attributes **/

public int getTotalBatchesDistributed() {
return getStatistic(StatsKey.GATEWAYSENDER_BATCHES_DISTRIBUTED).intValue();
}

public int getTotalBatchesRedistributed() {
return getStatistic(StatsKey.GATEWAYSENDER_TOTAL_BATCHES_REDISTRIBUTED).intValue();
}

public int getTotalBatchesWithIncompleteTransactions() {
return getStatistic(StatsKey.GATEWAYSENDER_TOTAL_BATCHES_WITH_INCOMPLETE_TRANSACTIONS)
.intValue();
}

public int getTotalEventsConflated() {
return getStatistic(StatsKey.GATEWAYSENDER_EVENTS_QUEUED_CONFLATED).intValue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@ public class GatewaySenderClusterStatsMonitor {

private static final String EVENTS_QUEUED_RATE = "EventsQueuedRate";

private static final String TOTAL_BATCHES_DISTRIBUTED = "TotalBatchesDistributed";

private static final String TOTAL_BATCHES_REDISTRIBUTED = "TotalBatchesRedistributed";

private static final String TOTAL_BATCHES_WITH_INCOMPLETE_TRANSACTIONS =
"TotalBatchesWithIncompleteTransactions";

private static final String TOTAL_EVENTS_CONFLATED = "TotalEventsConflated";


Expand All @@ -54,7 +59,9 @@ private void intTypeMap() {
typeMap.put(BATCHES_DISPATCHED_RATE, Float.TYPE);
typeMap.put(EVENT_QUEUE_SIZE, Integer.TYPE);
typeMap.put(EVENTS_QUEUED_RATE, Float.TYPE);
typeMap.put(TOTAL_BATCHES_DISTRIBUTED, Integer.TYPE);
typeMap.put(TOTAL_BATCHES_REDISTRIBUTED, Integer.TYPE);
typeMap.put(TOTAL_BATCHES_WITH_INCOMPLETE_TRANSACTIONS, Integer.TYPE);
typeMap.put(TOTAL_EVENTS_CONFLATED, Integer.TYPE);

}
Expand All @@ -75,10 +82,18 @@ public float getGatewaySenderEventsQueuedRate() {
return aggregator.getFloatValue(EVENTS_QUEUED_RATE);
}

public int getGatewaySenderTotalBatchesDistributed() {
return aggregator.getIntValue(TOTAL_BATCHES_DISTRIBUTED);
}

public int getGatewaySenderTotalBatchesRedistributed() {
return aggregator.getIntValue(TOTAL_BATCHES_REDISTRIBUTED);
}

public int getGatewaySenderTotalBatchesWithIncompleteTransactions() {
return aggregator.getIntValue(TOTAL_BATCHES_WITH_INCOMPLETE_TRANSACTIONS);
}

public int getGatewaySenderTotalEventsConflated() {
return aggregator.getIntValue(TOTAL_EVENTS_CONFLATED);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ public class StatsKey {
public static final String GATEWAYSENDER_BATCHES_DISTRIBUTED = "batchesDistributed";
public static final String GATEWAYSENDER_BATCHES_DISTRIBUTE_TIME = "batchDistributionTime";
public static final String GATEWAYSENDER_TOTAL_BATCHES_REDISTRIBUTED = "batchesRedistributed";
public static final String GATEWAYSENDER_TOTAL_BATCHES_WITH_INCOMPLETE_TRANSACTIONS =
"batchesWithIncompleteTransactions";
public static final String GATEWAYSENDER_EVENTS_QUEUED_CONFLATED = "eventsNotQueuedConflated";
public static final String GATEWAYSENDER_EVENTS_EXCEEDING_ALERT_THRESHOLD =
"eventsExceedingAlertThreshold";
Expand Down
3 changes: 2 additions & 1 deletion geode-docs/reference/topics/cache_xml.html.md.erb
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,8 @@ Configures a gateway sender to distribute region events to another <%=vars.produ
<td>group-transaction-events</td>
<td>Boolean value to ensure that all the events of a transaction are sent in the same batch, i.e., they are never spread across different batches.
<p>Only allowed to be set on gateway senders with the <code class="ph codeph">parallel</code> attribute set to false and <code class="ph codeph">dispatcher-threads</code> attribute equal to 1, or on gateway senders with the <code class="ph codeph">parallel</code> attribute set to true. Also, the <code class="ph codeph">enable-batch-conflation</code> attribute of the gateway sender must be set to false.</p>
<p><b>Note:</b> In order to work for a transaction, the regions to which the transaction events belong must be replicated by the same set of senders with this flag enabled.</p></td>
<p><b>Note:</b> In order to work for a transaction, the regions to which the transaction events belong must be replicated by the same set of senders with this flag enabled.</p>
<p><b>Note:</b> If the above condition is not fulfilled or under very high load traffic conditions, it may not be guaranteed that all the events for a transaction will be sent in the same batch, even if <code class="ph codeph">group-transaction-events</code> is enabled. The number of batches sent with incomplete transactions can be retrieved from the <code class="ph codeph">GatewaySenderMXBean</code> bean.</p></td>
<td>false</td>
</tr>
</tbody>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,8 @@ create gateway-sender --id=value --remote-distributed-system-id=value
<td><span class="keyword parmname">\-\-group-transaction-events</span></td>
<td>Boolean value to ensure that all the events of a transaction are sent in the same batch, i.e., they are never spread across different batches.
<p>Only allowed to be set on gateway senders with the <code class="ph codeph">parallel</code> attribute set to false and <code class="ph codeph">dispatcher-threads</code> attribute equal to 1, or on gateway senders with the <code class="ph codeph">parallel</code> attribute set to true. Also, the <code class="ph codeph">enable-batch-conflation</code> attribute of the gateway sender must be set to false.</p>
<p><b>Note:</b> In order to work for a transaction, the regions to which the transaction events belong must be replicated by the same set of senders with this flag enabled.</p></td>
<p><b>Note:</b> In order to work for a transaction, the regions to which the transaction events belong must be replicated by the same set of senders with this flag enabled.</p>
<p><b>Note:</b> If the above condition is not fulfilled or under very high load traffic conditions, it may not be guaranteed that all the events for a transaction will be sent in the same batch, even if <code class="ph codeph">group-transaction-events</code> is enabled. The number of batches sent with incomplete transactions can be retrieved from the <code class="ph codeph">GatewaySenderMXBean</code> bean.</p></td>
</td>
<td>false</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ In order to use transaction event grouping:
- The regions to which the transaction events belong must be replicated by the same set of gateway senders that also have this setting enabled.
- This setting cannot be enabled if `enable-batch-conflation` is in effect.

**Note:**
If the above conditions are not fulfilled or under very high load traffic conditions, it may not be guaranteed that all the events for a transaction will be sent in the same batch, even if <code class="ph codeph">group-transaction-events</code> is enabled. The number of batches sent with incomplete transactions can be retrieved from the <code class="ph codeph">GatewaySenderMXBean</code> bean.

By default, potential WAN conflicts are resolved using a timestamp mechanism. You can optionally install a custom conflict resolver to apply custom logic when determining whether to apply a potentially conflicting update received over a WAN.

[Consistency for Region Updates](../../developing/distributed_regions/region_entry_versions.html#topic_CF2798D3E12647F182C2CEC4A46E2045) describes how Geode ensures consistency within a cluster, in client caches, and when applying updates over a WAN. [Resolving Conflicting Events](../../developing/events/resolving_multisite_conflicts.html#topic_E97BB68748F14987916CD1A50E4B4542) provides more details about implementing a custom conflict resolver for WAN updates.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1251,6 +1251,7 @@ public static List<Integer> getSenderStats(String senderId, final int expectedQu
stats.add(statistics.getSecondaryEventQueueSize());
stats.add(statistics.getEventsProcessedByPQRM());
stats.add(statistics.getEventsExceedingAlertThreshold());
stats.add((int) statistics.getBatchesWithIncompleteTransactions());
return stats;
}

Expand Down
Loading

0 comments on commit c215645

Please sign in to comment.