Skip to content

Commit

Permalink
GEODE-10281: Fix WAN data inconsistency (apache#7665)
Browse files Browse the repository at this point in the history
  • Loading branch information
jvarenina authored Jul 7, 2022
1 parent bb93789 commit ac00f3c
Show file tree
Hide file tree
Showing 3 changed files with 393 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ public boolean shouldBeConflated() {
// If the message is an update, it may be conflatable. If it is a
// create, destroy, invalidate or destroy-region, it is not conflatable.
// Only updates are conflated.
return isUpdate();
return isUpdate() && !isConcurrencyConflict();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.stream.Stream;

import junitparams.Parameters;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;

import org.apache.geode.cache.Operation;
import org.apache.geode.cache.TransactionId;
Expand All @@ -61,18 +63,16 @@
import org.apache.geode.internal.serialization.VersionedDataOutputStream;
import org.apache.geode.internal.util.BlobHelper;
import org.apache.geode.test.fake.Fakes;
import org.apache.geode.test.junit.runners.GeodeParamsRunner;

@RunWith(GeodeParamsRunner.class)
public class GatewaySenderEventImplTest {

private GemFireCacheImpl cache;

@Rule
public TestName testName = new TestName();
private String testName;

@Before
public void setUpGemFire() {
@BeforeEach
public void setUpGemFire(TestInfo testInfo) {
testName = testInfo.getDisplayName();
createCache();
}

Expand Down Expand Up @@ -110,8 +110,8 @@ public void versionedFromData() throws IOException, ClassNotFoundException {
assertThat(gatewaySenderEvent.getTransactionId()).isNotNull();
}

@Test
@Parameters(method = "getVersionsAndExpectedInvocations")
@ParameterizedTest
@MethodSource("getVersionsAndExpectedInvocations")
public void testSerializingDataFromCurrentVersionToOldVersion(VersionAndExpectedInvocations vaei)
throws IOException {
GatewaySenderEventImpl gatewaySenderEvent = spy(GatewaySenderEventImpl.class);
Expand All @@ -129,8 +129,8 @@ public void testSerializingDataFromCurrentVersionToOldVersion(VersionAndExpected
any());
}

@Test
@Parameters(method = "getVersionsAndExpectedInvocations")
@ParameterizedTest
@MethodSource("getVersionsAndExpectedInvocations")
public void testDeserializingDataFromOldVersionToCurrentVersion(
VersionAndExpectedInvocations vaei)
throws IOException, ClassNotFoundException {
Expand All @@ -151,18 +151,17 @@ public void testDeserializingDataFromOldVersionToCurrentVersion(
any());
}

private VersionAndExpectedInvocations[] getVersionsAndExpectedInvocations() {
return new VersionAndExpectedInvocations[] {
new VersionAndExpectedInvocations(GEODE_1_8_0, 1, 0, 0),
new VersionAndExpectedInvocations(GEODE_1_13_0, 1, 1, 0),
new VersionAndExpectedInvocations(GEODE_1_14_0, 1, 1, 1)
};
private static Stream<Arguments> getVersionsAndExpectedInvocations() {
return Stream.of(
Arguments.of(new VersionAndExpectedInvocations(GEODE_1_8_0, 1, 0, 0)),
Arguments.of(new VersionAndExpectedInvocations(GEODE_1_13_0, 1, 1, 0)),
Arguments.of(new VersionAndExpectedInvocations(GEODE_1_14_0, 1, 1, 1)));
}

@Test
public void testEquality() throws Exception {
LocalRegion region = mock(LocalRegion.class);
when(region.getFullPath()).thenReturn(testName.getMethodName() + "_region");
when(region.getFullPath()).thenReturn(testName + "_region");
when(region.getCache()).thenReturn(cache);
Object event = ParallelGatewaySenderHelper.createGatewaySenderEvent(region, Operation.CREATE,
"key1", "value1", 0, 0, 0, 0);
Expand Down Expand Up @@ -209,7 +208,7 @@ public void testEquality() throws Exception {
assertThat(event).isNotEqualTo(eventDifferentValue);

LocalRegion region2 = mock(LocalRegion.class);
when(region2.getFullPath()).thenReturn(testName.getMethodName() + "_region2");
when(region2.getFullPath()).thenReturn(testName + "_region2");
when(region2.getCache()).thenReturn(cache);
Object eventDifferentRegion =
ParallelGatewaySenderHelper.createGatewaySenderEvent(region2, Operation.CREATE,
Expand All @@ -221,7 +220,7 @@ public void testEquality() throws Exception {
public void testSerialization() throws Exception {
// Set up test
LocalRegion region = mock(LocalRegion.class);
when(region.getFullPath()).thenReturn(testName.getMethodName() + "_region");
when(region.getFullPath()).thenReturn(testName + "_region");
when(region.getCache()).thenReturn(cache);
TXId txId = new TXId(cache.getMyId(), 0);
when(region.getTXId()).thenReturn(txId);
Expand Down Expand Up @@ -348,12 +347,13 @@ private EntryEventImpl mockEntryEventImpl(final TransactionId transactionId) {
return cacheEvent;
}

@Parameters({"true, true", "true, false", "false, false"})
@ParameterizedTest
@CsvSource({"true,true", "true,false", "false,false"})
public void testCreation_WithAfterUpdateWithGenerateCallbacks(boolean isGenerateCallbacks,
boolean isCallbackArgumentNull)
throws IOException {
InternalRegion region = mock(InternalRegion.class);
when(region.getFullPath()).thenReturn(testName.getMethodName() + "_region");
InternalRegion region = mock(LocalRegion.class);
when(region.getFullPath()).thenReturn(testName + "_region");

Operation operation = mock(Operation.class);
when(operation.isLocalLoad()).thenReturn(true);
Expand All @@ -377,6 +377,37 @@ public void testCreation_WithAfterUpdateWithGenerateCallbacks(boolean isGenerate
assertThat(event.getAction()).isEqualTo(action);
}

@Test
public void testShouldNotBeConflatedCreate() throws IOException {
final EntryEventImpl cacheEvent = mockEntryEventImpl(mock(TransactionId.class));

final GatewaySenderEventImpl gatewaySenderEvent =
new GatewaySenderEventImpl(EnumListenerEvent.AFTER_CREATE, cacheEvent, null, INCLUDE);

assertThat(gatewaySenderEvent.shouldBeConflated()).isFalse();
}

@Test
public void testShouldBeConflatedUpdate() throws IOException {
final EntryEventImpl cacheEvent = mockEntryEventImpl(mock(TransactionId.class));

final GatewaySenderEventImpl gatewaySenderEvent =
new GatewaySenderEventImpl(EnumListenerEvent.AFTER_UPDATE, cacheEvent, null, INCLUDE);

assertThat(gatewaySenderEvent.shouldBeConflated()).isTrue();
}

@Test
public void testShouldNotBeConflatedUpdateConcurrentConflict() throws IOException {
final EntryEventImpl cacheEvent = mockEntryEventImpl(mock(TransactionId.class));
when(cacheEvent.isConcurrencyConflict()).thenReturn(true);

final GatewaySenderEventImpl gatewaySenderEvent =
new GatewaySenderEventImpl(EnumListenerEvent.AFTER_UPDATE, cacheEvent, null, INCLUDE);

assertThat(gatewaySenderEvent.shouldBeConflated()).isFalse();
}

public static class VersionAndExpectedInvocations {

private final KnownVersion version;
Expand Down
Loading

0 comments on commit ac00f3c

Please sign in to comment.