Skip to content

Commit

Permalink
GEODE-9369: Command to copy region entries from a WAN site to another (
Browse files Browse the repository at this point in the history
…apache#7006)

* GEODE-9369: Add command to copy region entries from one site to another

* GEODE-9369: Create servers without offheap to avoid test failures in windows due to out of memory

* GEODE-9369: Fix flaky test

* GEODE-9369: Fix some test case failues on windows

* GEODE-9369: Fix windows test failures in CI by not running the tests on windows

* GEODE-9369: Fix test cases after rebasing locator serialization filtering by default on Java 8

* GEODE-9369: Revert changes required by GEODE-9758
  • Loading branch information
albertogpz authored Nov 12, 2021
1 parent 14a818b commit 3eaeed8
Show file tree
Hide file tree
Showing 37 changed files with 3,756 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2084,6 +2084,7 @@ limitations under the License.
</li>
<li>
<a href="/docs/guide/<%=vars.product_version_nodot%>/tools_modules/gfsh/command-pages/version.html">version</a>
<a href="/docs/guide/<%=vars.product_version_nodot%>/tools_modules/gfsh/command-pages/wan_copy_region.html">wan-copy region</a>
</li>
</ul>
</li>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ public void run() {
VersionTagHolder holder = new VersionTagHolder(tag);
ClientProxyMembershipID id = ClientProxyMembershipID
.getNewProxyMembership(CCRegion.getDistributionManager().getSystem());
CCRegion.basicBridgePut("cckey0", "newvalue", null, true, null, id, true, holder);
CCRegion.basicBridgePut("cckey0", "newvalue", null, true, null, id, holder, true);
vm0.invoke("check conflation count", () -> {
// after changed the 3rd try of AUO.doPutOrCreate to be ifOld=false ifNew=false
// ARM.updateEntry will be called one more time, so there will be 2 conflicted events
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,18 @@ public String toString() {
@Immutable
public static final EnumListenerEvent TIMESTAMP_UPDATE = new TIMESTAMP_UPDATE(); // 18

@Immutable
public static final EnumListenerEvent AFTER_UPDATE_WITH_GENERATE_CALLBACKS =
new AFTER_UPDATE_WITH_GENERATE_CALLBACKS(); // 19

@Immutable
private static final EnumListenerEvent[] instances =
new EnumListenerEvent[] {AFTER_CREATE, AFTER_UPDATE, AFTER_INVALIDATE, AFTER_DESTROY,
AFTER_REGION_CREATE, AFTER_REGION_INVALIDATE, AFTER_REGION_CLEAR, AFTER_REGION_DESTROY,
AFTER_REMOTE_REGION_CREATE, AFTER_REMOTE_REGION_DEPARTURE, AFTER_REMOTE_REGION_CRASH,
AFTER_ROLE_GAIN, AFTER_ROLE_LOSS, AFTER_REGION_LIVE, AFTER_REGISTER_INSTANTIATOR,
AFTER_REGISTER_DATASERIALIZER, AFTER_TOMBSTONE_EXPIRATION, TIMESTAMP_UPDATE};
AFTER_REGISTER_DATASERIALIZER, AFTER_TOMBSTONE_EXPIRATION, TIMESTAMP_UPDATE,
AFTER_UPDATE_WITH_GENERATE_CALLBACKS};

static {
for (int i = 0; i < instances.length; i++) {
Expand Down Expand Up @@ -412,6 +417,21 @@ public byte getEventCode() {
}
}

private static class AFTER_UPDATE_WITH_GENERATE_CALLBACKS extends EnumListenerEvent {
protected AFTER_UPDATE_WITH_GENERATE_CALLBACKS() {
super("AFTER_UPDATE_WITH_GENERATE_CALLBACKS");
}

@Override
public void dispatchEvent(CacheEvent event, CacheListener listener) {}

@Override
public byte getEventCode() {
return 19;
}
}


/**
*
* This method returns the EnumListenerEvent object corresponding to the cCode given.
Expand All @@ -435,6 +455,8 @@ public byte getEventCode() {
* <li>15 - AFTER_REGISTER_INSTANTIATOR
* <li>16 - AFTER_REGISTER_DATASERIALIZER
* <li>17 - AFTER_TOMBSTONE_EXPIRATION
* <li>18 - TIMESTAMP_UPDATE
* <li>19 - AFTER_UPDATE_WITH_GENERATE_CALLBACKS
* </ul>
*
* @param eventCode the eventCode corresponding to the EnumListenerEvent object desired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5179,8 +5179,9 @@ public boolean basicBridgeCreate(final Object key, final byte[] value, boolean i
}

public boolean basicBridgePut(Object key, Object value, byte[] deltaBytes, boolean isObject,
Object callbackArg, ClientProxyMembershipID memberId, boolean fromClient,
EntryEventImpl clientEvent) throws TimeoutException, CacheWriterException {
Object callbackArg, ClientProxyMembershipID memberId,
EntryEventImpl clientEvent, boolean generateCallbacks)
throws TimeoutException, CacheWriterException {

EventID eventID = clientEvent.getEventId();
Object theCallbackArg = callbackArg;
Expand All @@ -5189,7 +5190,7 @@ public boolean basicBridgePut(Object key, Object value, byte[] deltaBytes, boole
@Released
final EntryEventImpl event = entryEventFactory.create(this, Operation.UPDATE, key,
null, theCallbackArg, false,
memberId.getDistributedMember(), true, eventID);
memberId.getDistributedMember(), generateCallbacks, eventID);

try {
event.setContext(memberId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -992,8 +992,8 @@ public double getLoadPerConnection() {
return this.stats.getDouble(loadPerConnectionId);
}

public int getProcessBatchRequests() {
return this.stats.getInt(processBatchRequestsId);
public long getProcessBatchRequests() {
return this.stats.getLong(processBatchRequestsId);
}

public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.cache.wan.BatchException70;
import org.apache.geode.internal.cache.wan.GatewayReceiverStats;
import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
import org.apache.geode.internal.security.AuthorizeRequest;
import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.internal.util.BlobHelper;
Expand Down Expand Up @@ -308,7 +309,7 @@ public void cmdExecute(final Message clientMessage, final ServerConnection serve
// attempt to update the entry
if (!result) {
result = region.basicBridgePut(key, value, null, isObject, callbackArg,
serverConnection.getProxyID(), false, clientEvent);
serverConnection.getProxyID(), clientEvent, true);
}
}

Expand All @@ -333,6 +334,7 @@ public void cmdExecute(final Message clientMessage, final ServerConnection serve
break;

case 1: // Update
case GatewaySenderEventImpl.UPDATE_ACTION_NO_GENERATE_CALLBACKS:
try {
// Retrieve the value from the message parts (do not deserialize it)
valuePart = clientMessage.getPart(partNumber + 5);
Expand Down Expand Up @@ -405,8 +407,10 @@ public void cmdExecute(final Message clientMessage, final ServerConnection serve
if (isPdxEvent) {
result = addPdxType(crHelper, key, value);
} else {
boolean generateCallbacks =
actionType != GatewaySenderEventImpl.UPDATE_ACTION_NO_GENERATE_CALLBACKS;
result = region.basicBridgePut(key, value, null, isObject, callbackArg,
serverConnection.getProxyID(), false, clientEvent);
serverConnection.getProxyID(), clientEvent, generateCallbacks);
}
if (result || clientEvent.isConcurrencyConflict()) {
serverConnection.setModificationInfo(true, regionName, key);
Expand Down Expand Up @@ -637,7 +641,8 @@ public void cmdExecute(final Message clientMessage, final ServerConnection serve
exceptions.add(be);
} finally {
// Increment the partNumber
if (actionType == 0 /* create */ || actionType == 1 /* update */) {
if (actionType == 0 /* create */ || actionType == 1 /* update */
|| actionType == GatewaySenderEventImpl.UPDATE_ACTION_NO_GENERATE_CALLBACKS) {
if (callbackArgExists) {
partNumber += 9;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ public void cmdExecute(final Message clientMessage, final ServerConnection serve
serverConnection.getProxyID(), true, clientEvent, true);
} else {
result = region.basicBridgePut(key, value, delta, isObject, callbackArg,
serverConnection.getProxyID(), true, clientEvent);
serverConnection.getProxyID(), clientEvent, true);
}
if (clientMessage.isRetry() && clientEvent.isConcurrencyConflict()
&& clientEvent.getVersionTag() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@

import org.apache.geode.CancelException;
import org.apache.geode.cache.asyncqueue.AsyncEventListener;
import org.apache.geode.cache.client.internal.Connection;
import org.apache.geode.cache.client.internal.ExecutablePool;
import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.logging.internal.log4j.api.LogService;

/**
Expand Down Expand Up @@ -184,4 +187,10 @@ public void stop() {
public void shutDownAckReaderConnection() {
// no op
}

@Override
public void sendBatch(List<GatewayQueueEvent<?, ?>> events, Connection connection,
ExecutablePool senderPool, int batchId, boolean removeFromQueueOnException) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

import java.util.List;

import org.apache.geode.cache.client.internal.Connection;
import org.apache.geode.cache.client.internal.ExecutablePool;
import org.apache.geode.cache.wan.GatewayQueueEvent;

/**
* @since GemFire 7.0
*
Expand All @@ -31,4 +35,9 @@ public interface GatewaySenderEventDispatcher {
void stop();

void shutDownAckReaderConnection();

void sendBatch(List<GatewayQueueEvent<?, ?>> events, Connection connection,
ExecutablePool senderPool,
int batchId, boolean removeFromQueueOnException)
throws BatchException70;
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ public class GatewaySenderEventImpl

private static final int VERSION_ACTION = 3;

public static final int UPDATE_ACTION_NO_GENERATE_CALLBACKS = 4;

private static final int INVALIDATE_ACTION = 5;
/**
* Static constants for Operation detail of EntryEvent.
Expand Down Expand Up @@ -309,7 +311,7 @@ public GatewaySenderEventImpl(EnumListenerEvent operation, CacheEvent<?, ?> ce,

// Initialize the action and number of parts (called after _callbackArgument
// is set above)
initializeAction(this.operation);
initializeAction(this.operation, event);

// initialize the operation detail
initializeOperationDetail(event.getOperation());
Expand Down Expand Up @@ -1022,7 +1024,7 @@ protected void initializeValue(EntryEventImpl event) {
*
* @param operation The operation from which to initialize this event's action and number of parts
*/
protected void initializeAction(EnumListenerEvent operation) {
protected void initializeAction(EnumListenerEvent operation, EntryEventImpl event) {
if (operation == EnumListenerEvent.AFTER_CREATE) {
// Initialize after create action
action = CREATE_ACTION;
Expand Down Expand Up @@ -1065,6 +1067,13 @@ protected void initializeAction(EnumListenerEvent operation) {
// Initialize number of parts
// Since there is no value, there is one less part
numberOfParts = (callbackArgument == null) ? 7 : 8;
} else if (operation == EnumListenerEvent.AFTER_UPDATE_WITH_GENERATE_CALLBACKS) {
if (event.isGenerateCallbacks()) {
action = UPDATE_ACTION;
} else {
action = UPDATE_ACTION_NO_GENERATE_CALLBACKS;
}
numberOfParts = (this.callbackArgument == null) ? 8 : 9;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public void testGetEnumListEvent() {
// extra non-existent code checks as a markers so that this test will
// fail if further events are added (0th or +1 codes) without updating this test
checkAndAssert(18, EnumListenerEvent.TIMESTAMP_UPDATE);
checkAndAssert(19, null);
checkAndAssert(19, EnumListenerEvent.AFTER_UPDATE_WITH_GENERATE_CALLBACKS);
checkAndAssert(20, null);
}

// check that the code and object both match
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void setUp() throws Exception {
when(keyPart.getStringOrObject()).thenReturn(KEY);

when(localRegion.basicBridgePut(eq(KEY), eq(VALUE), eq(null), eq(true), eq(CALLBACK_ARG),
any(), eq(true), any())).thenReturn(true);
any(), any(), eq(true))).thenReturn(true);

when(message.getNumberOfParts()).thenReturn(8);
when(message.getPart(eq(0))).thenReturn(regionNamePart);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.TXId;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderHelper;
Expand Down Expand Up @@ -113,13 +114,12 @@ public void versionedFromData() throws IOException, ClassNotFoundException {
@Parameters(method = "getVersionsAndExpectedInvocations")
public void testSerializingDataFromCurrentVersionToOldVersion(VersionAndExpectedInvocations vaei)
throws IOException {
InternalDataSerializer internalDataSerializer = spy(InternalDataSerializer.class);
GatewaySenderEventImpl gatewaySenderEvent = spy(GatewaySenderEventImpl.class);
OutputStream outputStream = mock(OutputStream.class);
VersionedDataOutputStream versionedDataOutputStream =
new VersionedDataOutputStream(outputStream, vaei.getVersion());

internalDataSerializer.invokeToData(gatewaySenderEvent, versionedDataOutputStream);
InternalDataSerializer.invokeToData(gatewaySenderEvent, versionedDataOutputStream);
verify(gatewaySenderEvent, times(0)).toData(any(), any());
verify(gatewaySenderEvent, times(vaei.getPre115Invocations())).toDataPre_GEODE_1_15_0_0(any(),
any());
Expand All @@ -134,15 +134,14 @@ public void testSerializingDataFromCurrentVersionToOldVersion(VersionAndExpected
public void testDeserializingDataFromOldVersionToCurrentVersion(
VersionAndExpectedInvocations vaei)
throws IOException, ClassNotFoundException {
InternalDataSerializer internalDataSerializer = spy(InternalDataSerializer.class);
GatewaySenderEventImpl gatewaySenderEvent = spy(GatewaySenderEventImpl.class);
InputStream inputStream = mock(InputStream.class);
when(inputStream.read()).thenReturn(69); // NULL_STRING
when(inputStream.read(isA(byte[].class), isA(int.class), isA(int.class))).thenReturn(1);
VersionedDataInputStream versionedDataInputStream =
new VersionedDataInputStream(inputStream, vaei.getVersion());

internalDataSerializer.invokeFromData(gatewaySenderEvent, versionedDataInputStream);
InternalDataSerializer.invokeFromData(gatewaySenderEvent, versionedDataInputStream);
verify(gatewaySenderEvent, times(0)).fromData(any(), any());
verify(gatewaySenderEvent, times(vaei.getPre115Invocations())).fromDataPre_GEODE_1_15_0_0(any(),
any());
Expand Down Expand Up @@ -349,6 +348,35 @@ private EntryEventImpl mockEntryEventImpl(final TransactionId transactionId) {
return cacheEvent;
}

@Parameters({"true, true", "true, false", "false, false"})
public void testCreation_WithAfterUpdateWithGenerateCallbacks(boolean isGenerateCallbacks,
boolean isCallbackArgumentNull)
throws IOException {
InternalRegion region = mock(InternalRegion.class);
when(region.getFullPath()).thenReturn(testName.getMethodName() + "_region");

Operation operation = mock(Operation.class);
when(operation.isLocalLoad()).thenReturn(true);

EntryEventImpl cacheEvent = mock(EntryEventImpl.class);
when(cacheEvent.getRegion()).thenReturn(region);
when(cacheEvent.getEventId()).thenReturn(mock(EventID.class));
when(cacheEvent.getOperation()).thenReturn(operation);
when(cacheEvent.isGenerateCallbacks()).thenReturn(isGenerateCallbacks);
when(cacheEvent.getRawCallbackArgument())
.thenReturn(isCallbackArgumentNull ? null : mock(GatewaySenderEventCallbackArgument.class));

GatewaySenderEventImpl event = new GatewaySenderEventImpl(
EnumListenerEvent.AFTER_UPDATE_WITH_GENERATE_CALLBACKS, cacheEvent,
null, false, INCLUDE_LAST_EVENT);

final int numberOfParts = isCallbackArgumentNull ? 8 : 9;
assertThat(event.getNumberOfParts()).isEqualTo(numberOfParts);

final int action = isGenerateCallbacks ? 1 : 4;
assertThat(event.getAction()).isEqualTo(action);
}

public static class VersionAndExpectedInvocations {

private final KnownVersion version;
Expand Down
Loading

0 comments on commit 3eaeed8

Please sign in to comment.