Skip to content

Commit

Permalink
GEODE-9369: Command to copy region entries from a WAN site to another (
Browse files Browse the repository at this point in the history
…apache#6601)

* GEODE-9369: Command to copy region entries from a WAN site to another

The command has been implemented as proposed in
https://cwiki.apache.org/confluence/display/GEODE/Geode+Command+to+replicate+region+data+from+one+site+to+another+connected+via+WAN
with some modifications with respect to the initial proposal.

The command will get the entries of a region
in a WAN site and will put them in batches
that will be sent by a gateway sender to a remote
WAN site.

* GEODE-9369: Remove comments added to geode-gfsh/src/test/resources/expected-pom.xml

* GEODE-9369: Changes after review

* GEODE-9369: Update with Kirk's review comments

* GEODE-9369: Update with boglesby's review comments

* GEODE-9369: Changes after boglesby's review

* GEODE-9369: Updated with davebarnes97's review comments.

* GEODE-9369: More changes after davebarnes97's review

* GEODE-9369: Updated with DonalEvan's comments

* GEODE-9369: Small refactoring: Use CompletableFuture instead of Callable + FutureTask

* GEODE-9369: Fix race condition added in previous commit

* GEODE-9369: Small change of re-review from DonalEvans

* GEODE-9369: Fix esporadic test failures due to error log.

* GEODE-9369: Changes required after rebasing to develop
  • Loading branch information
albertogpz authored Aug 25, 2021
1 parent 58592ed commit c9d465b
Show file tree
Hide file tree
Showing 29 changed files with 3,293 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2109,6 +2109,7 @@ gfsh</a>
</li>
<li>
<a href="/docs/guide/<%=vars.product_version_nodot%>/tools_modules/gfsh/command-pages/version.html">version</a>
<a href="/docs/guide/<%=vars.product_version_nodot%>/tools_modules/gfsh/command-pages/wan_copy_region.html">wan-copy region</a>
</li>
</ul>
</li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ public void run() {
VersionTagHolder holder = new VersionTagHolder(tag);
ClientProxyMembershipID id = ClientProxyMembershipID
.getNewProxyMembership(CCRegion.getDistributionManager().getSystem());
CCRegion.basicBridgePut("cckey0", "newvalue", null, true, null, id, true, holder);
CCRegion.basicBridgePut("cckey0", "newvalue", null, true, null, id, holder, true);
vm0.invoke("check conflation count", () -> {
// after changed the 3rd try of AUO.doPutOrCreate to be ifOld=false ifNew=false
// ARM.updateEntry will be called one more time, so there will be 2 conflicted events
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,18 @@ public String toString() {
@Immutable
public static final EnumListenerEvent TIMESTAMP_UPDATE = new TIMESTAMP_UPDATE(); // 18

@Immutable
public static final EnumListenerEvent AFTER_UPDATE_WITH_GENERATE_CALLBACKS =
new AFTER_UPDATE_WITH_GENERATE_CALLBACKS(); // 19

@Immutable
private static final EnumListenerEvent[] instances =
new EnumListenerEvent[] {AFTER_CREATE, AFTER_UPDATE, AFTER_INVALIDATE, AFTER_DESTROY,
AFTER_REGION_CREATE, AFTER_REGION_INVALIDATE, AFTER_REGION_CLEAR, AFTER_REGION_DESTROY,
AFTER_REMOTE_REGION_CREATE, AFTER_REMOTE_REGION_DEPARTURE, AFTER_REMOTE_REGION_CRASH,
AFTER_ROLE_GAIN, AFTER_ROLE_LOSS, AFTER_REGION_LIVE, AFTER_REGISTER_INSTANTIATOR,
AFTER_REGISTER_DATASERIALIZER, AFTER_TOMBSTONE_EXPIRATION, TIMESTAMP_UPDATE};
AFTER_REGISTER_DATASERIALIZER, AFTER_TOMBSTONE_EXPIRATION, TIMESTAMP_UPDATE,
AFTER_UPDATE_WITH_GENERATE_CALLBACKS};

static {
for (int i = 0; i < instances.length; i++) {
Expand Down Expand Up @@ -412,6 +417,21 @@ public byte getEventCode() {
}
}

private static class AFTER_UPDATE_WITH_GENERATE_CALLBACKS extends EnumListenerEvent {
protected AFTER_UPDATE_WITH_GENERATE_CALLBACKS() {
super("AFTER_UPDATE_WITH_GENERATE_CALLBACKS");
}

@Override
public void dispatchEvent(CacheEvent event, CacheListener listener) {}

@Override
public byte getEventCode() {
return 19;
}
}


/**
*
* This method returns the EnumListenerEvent object corresponding to the cCode given.
Expand All @@ -435,6 +455,8 @@ public byte getEventCode() {
* <li>15 - AFTER_REGISTER_INSTANTIATOR
* <li>16 - AFTER_REGISTER_DATASERIALIZER
* <li>17 - AFTER_TOMBSTONE_EXPIRATION
* <li>18 - TIMESTAMP_UPDATE
* <li>19 - AFTER_UPDATE_WITH_GENERATE_CALLBACKS
* </ul>
*
* @param eventCode the eventCode corresponding to the EnumListenerEvent object desired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5179,8 +5179,9 @@ public boolean basicBridgeCreate(final Object key, final byte[] value, boolean i
}

public boolean basicBridgePut(Object key, Object value, byte[] deltaBytes, boolean isObject,
Object callbackArg, ClientProxyMembershipID memberId, boolean fromClient,
EntryEventImpl clientEvent) throws TimeoutException, CacheWriterException {
Object callbackArg, ClientProxyMembershipID memberId,
EntryEventImpl clientEvent, boolean generateCallbacks)
throws TimeoutException, CacheWriterException {

EventID eventID = clientEvent.getEventId();
Object theCallbackArg = callbackArg;
Expand All @@ -5189,7 +5190,7 @@ public boolean basicBridgePut(Object key, Object value, byte[] deltaBytes, boole
@Released
final EntryEventImpl event = entryEventFactory.create(this, Operation.UPDATE, key,
null, theCallbackArg, false,
memberId.getDistributedMember(), true, eventID);
memberId.getDistributedMember(), generateCallbacks, eventID);

try {
event.setContext(memberId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -992,8 +992,8 @@ public double getLoadPerConnection() {
return this.stats.getDouble(loadPerConnectionId);
}

public int getProcessBatchRequests() {
return this.stats.getInt(processBatchRequestsId);
public long getProcessBatchRequests() {
return this.stats.getLong(processBatchRequestsId);
}

public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.cache.wan.BatchException70;
import org.apache.geode.internal.cache.wan.GatewayReceiverStats;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.security.AuthorizeRequest;
import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.internal.util.BlobHelper;
Expand Down Expand Up @@ -308,7 +309,7 @@ public void cmdExecute(final Message clientMessage, final ServerConnection serve
// attempt to update the entry
if (!result) {
result = region.basicBridgePut(key, value, null, isObject, callbackArg,
serverConnection.getProxyID(), false, clientEvent);
serverConnection.getProxyID(), clientEvent, true);
}
}

Expand All @@ -333,6 +334,7 @@ public void cmdExecute(final Message clientMessage, final ServerConnection serve
break;

case 1: // Update
case GatewaySenderEventImpl.UPDATE_ACTION_NO_GENERATE_CALLBACKS:
try {
// Retrieve the value from the message parts (do not deserialize it)
valuePart = clientMessage.getPart(partNumber + 5);
Expand Down Expand Up @@ -405,8 +407,10 @@ public void cmdExecute(final Message clientMessage, final ServerConnection serve
if (isPdxEvent) {
result = addPdxType(crHelper, key, value);
} else {
boolean generateCallbacks =
actionType != GatewaySenderEventImpl.UPDATE_ACTION_NO_GENERATE_CALLBACKS;
result = region.basicBridgePut(key, value, null, isObject, callbackArg,
serverConnection.getProxyID(), false, clientEvent);
serverConnection.getProxyID(), clientEvent, generateCallbacks);
}
if (result || clientEvent.isConcurrencyConflict()) {
serverConnection.setModificationInfo(true, regionName, key);
Expand Down Expand Up @@ -637,7 +641,8 @@ public void cmdExecute(final Message clientMessage, final ServerConnection serve
exceptions.add(be);
} finally {
// Increment the partNumber
if (actionType == 0 /* create */ || actionType == 1 /* update */) {
if (actionType == 0 /* create */ || actionType == 1 /* update */
|| actionType == GatewaySenderEventImpl.UPDATE_ACTION_NO_GENERATE_CALLBACKS) {
if (callbackArgExists) {
partNumber += 9;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ public void cmdExecute(final Message clientMessage, final ServerConnection serve
serverConnection.getProxyID(), true, clientEvent, true);
} else {
result = region.basicBridgePut(key, value, delta, isObject, callbackArg,
serverConnection.getProxyID(), true, clientEvent);
serverConnection.getProxyID(), clientEvent, true);
}
if (clientMessage.isRetry() && clientEvent.isConcurrencyConflict()
&& clientEvent.getVersionTag() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@

import org.apache.geode.CancelException;
import org.apache.geode.cache.asyncqueue.AsyncEventListener;
import org.apache.geode.cache.client.internal.Connection;
import org.apache.geode.cache.client.internal.ExecutablePool;
import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.logging.internal.log4j.api.LogService;

/**
Expand Down Expand Up @@ -184,4 +187,10 @@ public void stop() {
public void shutDownAckReaderConnection() {
// no op
}

@Override
public void sendBatch(List<GatewayQueueEvent<?, ?>> events, Connection connection,
ExecutablePool senderPool, int batchId, boolean removeFromQueueOnException) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

import java.util.List;

import org.apache.geode.cache.client.internal.Connection;
import org.apache.geode.cache.client.internal.ExecutablePool;
import org.apache.geode.cache.wan.GatewayQueueEvent;

/**
* @since GemFire 7.0
*
Expand All @@ -31,4 +35,9 @@ public interface GatewaySenderEventDispatcher {
void stop();

void shutDownAckReaderConnection();

void sendBatch(List<GatewayQueueEvent<?, ?>> events, Connection connection,
ExecutablePool senderPool,
int batchId, boolean removeFromQueueOnException)
throws BatchException70;
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ public class GatewaySenderEventImpl

private static final int VERSION_ACTION = 3;

public static final int UPDATE_ACTION_NO_GENERATE_CALLBACKS = 4;

private static final int INVALIDATE_ACTION = 5;
/**
* Static constants for Operation detail of EntryEvent.
Expand Down Expand Up @@ -310,7 +312,7 @@ public GatewaySenderEventImpl(EnumListenerEvent operation, CacheEvent<?, ?> ce,

// Initialize the action and number of parts (called after _callbackArgument
// is set above)
initializeAction(this.operation);
initializeAction(this.operation, event);

// initialize the operation detail
initializeOperationDetail(event.getOperation());
Expand Down Expand Up @@ -1021,7 +1023,7 @@ protected void initializeValue(EntryEventImpl event) {
*
* @param operation The operation from which to initialize this event's action and number of parts
*/
protected void initializeAction(EnumListenerEvent operation) {
protected void initializeAction(EnumListenerEvent operation, EntryEventImpl event) {
if (operation == EnumListenerEvent.AFTER_CREATE) {
// Initialize after create action
action = CREATE_ACTION;
Expand Down Expand Up @@ -1064,6 +1066,13 @@ protected void initializeAction(EnumListenerEvent operation) {
// Initialize number of parts
// Since there is no value, there is one less part
numberOfParts = (callbackArgument == null) ? 7 : 8;
} else if (operation == EnumListenerEvent.AFTER_UPDATE_WITH_GENERATE_CALLBACKS) {
if (event.isGenerateCallbacks()) {
action = UPDATE_ACTION;
} else {
action = UPDATE_ACTION_NO_GENERATE_CALLBACKS;
}
numberOfParts = (this.callbackArgument == null) ? 8 : 9;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1426,6 +1426,55 @@ public class CliStrings {
public static final String EXPORT_DATA__SUCCESS__MESSAGE =
"Data successfully exported from region : {0} to file : {1} on host : {2}";

/* 'wan-copy region' command */
public static final String WAN_COPY_REGION = "wan-copy region";
public static final String WAN_COPY_REGION__HELP =
"Copy a region with a senderId via WAN replication";
public static final String WAN_COPY_REGION__REGION = "region";
public static final String WAN_COPY_REGION__REGION__HELP =
"Region from which data will be exported.";
public static final String WAN_COPY_REGION__SENDERID = "sender-id";
public static final String WAN_COPY_REGION__SENDERID__HELP =
"Sender Id to use to copy the region.";
public static final String WAN_COPY_REGION__MAXRATE = "max-rate";
public static final String WAN_COPY_REGION__MAXRATE__HELP =
"Maximum rate for copying in entries per second.";
public static final String WAN_COPY_REGION__BATCHSIZE = "batch-size";
public static final String WAN_COPY_REGION__BATCHSIZE__HELP =
"Number of entries to be copied in each batch.";
public static final String WAN_COPY_REGION__CANCEL = "cancel";
public static final String WAN_COPY_REGION__CANCEL__HELP =
"Cancel an ongoing wan-copy region command";
public static final String WAN_COPY_REGION__MSG__REGION__NOT__FOUND = "Region {0} not found";
public static final String WAN_COPY_REGION__MSG__REGION__NOT__USING_SENDER =
"Region {0} is not configured to use sender {1}";
public static final String WAN_COPY_REGION__MSG__SENDER__NOT__FOUND = "Sender {0} not found";
public static final String WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY =
"Sender {0} is serial and not primary. 0 entries copied.";
public static final String WAN_COPY_REGION__MSG__SENDER__NOT__RUNNING =
"Sender {0} is not running";
public static final String WAN_COPY_REGION__MSG__EXECUTION__CANCELED = "Execution canceled";
public static final String WAN_COPY_REGION__MSG__EXECUTIONS__CANCELED =
"Executions canceled: {0}";
public static final String WAN_COPY_REGION__MSG__EXECUTION__FAILED =
"Execution failed. Error: {0}";
public static final String WAN_COPY_REGION__MSG__NO__CONNECTION__POOL =
"No connection pool available to receiver";
public static final String WAN_COPY_REGION__MSG__COMMAND__NOT__SUPPORTED__AT__REMOTE__SITE =
"Command not supported at remote site.";
public static final String WAN_COPY_REGION__MSG__NO__CONNECTION =
"No connection available to receiver after having copied {0} entries";
public static final String WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED =
"Error ({0}) in operation after having copied {1} entries";
public static final String WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED =
"Operation canceled before having copied all entries";
public static final String WAN_COPY_REGION__MSG__COPIED__ENTRIES = "Entries copied: {0}";
public static final String WAN_COPY_REGION__MSG__NO__RUNNING__COMMAND =
"No running command to be canceled for region {0} and sender {1}";
public static final String WAN_COPY_REGION__MSG__ALREADY__RUNNING__COMMAND =
"There is already a command running for region {0} and sender {1}";


/* export logs command */
public static final String EXPORT_LOGS = "export logs";
public static final String EXPORT_LOGS__HELP = "Export the log files for a member or members.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public void testGetEnumListEvent() {
// extra non-existent code checks as a markers so that this test will
// fail if further events are added (0th or +1 codes) without updating this test
checkAndAssert(18, EnumListenerEvent.TIMESTAMP_UPDATE);
checkAndAssert(19, null);
checkAndAssert(19, EnumListenerEvent.AFTER_UPDATE_WITH_GENERATE_CALLBACKS);
checkAndAssert(20, null);
}

// check that the code and object both match
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void setUp() throws Exception {
when(keyPart.getStringOrObject()).thenReturn(KEY);

when(localRegion.basicBridgePut(eq(KEY), eq(VALUE), eq(null), eq(true), eq(CALLBACK_ARG),
any(), eq(true), any())).thenReturn(true);
any(), any(), eq(true))).thenReturn(true);

when(message.getNumberOfParts()).thenReturn(8);
when(message.getPart(eq(0))).thenReturn(regionNamePart);
Expand Down
Loading

0 comments on commit c9d465b

Please sign in to comment.