Skip to content

Commit

Permalink
Fix flaky tests: ElasticSearchClientTests (apache#12347)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi authored Oct 14, 2021
1 parent 508ef82 commit 77fb13e
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@
import java.util.concurrent.atomic.AtomicReference;

@Slf4j
public class ElasticSearchClient {
public class ElasticSearchClient implements AutoCloseable {

static final String[] malformedErrors = {
"mapper_parsing_exception",
Expand All @@ -129,7 +129,7 @@ public class ElasticSearchClient {
this.config = elasticSearchConfig;
this.configCallback = new ConfigCallback();
this.backoffRetry = new RandomExponentialRetry(elasticSearchConfig.getMaxRetryTimeInSec());
if (config.isBulkEnabled() == false) {
if (!config.isBulkEnabled()) {
bulkProcessor = null;
} else {
BulkProcessor.Builder builder = BulkProcessor.builder(
Expand Down Expand Up @@ -359,6 +359,7 @@ public void flush() {
bulkProcessor.flush();
}

@Override
public void close() {
try {
if (bulkProcessor != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,59 +81,59 @@ public void fail() {
@Test
public void testIndexDelete() throws Exception {
String index = "myindex-" + UUID.randomUUID();
ElasticSearchClient client = new ElasticSearchClient(new ElasticSearchConfig()
try (ElasticSearchClient client = new ElasticSearchClient(new ElasticSearchConfig()
.setElasticSearchUrl("http://" + container.getHttpHostAddress())
.setIndexName(index)
);
assertTrue(client.createIndexIfNeeded(index));
try {
MockRecord<GenericObject> mockRecord = new MockRecord<>();
client.indexDocument(mockRecord, Pair.of("1", "{ \"a\":1}"));
assertEquals(mockRecord.acked, 1);
assertEquals(mockRecord.failed, 0);
assertEquals(client.totalHits(index), 1);

client.deleteDocument(mockRecord, "1");
assertEquals(mockRecord.acked, 2);
assertEquals(mockRecord.failed, 0);
assertEquals(client.totalHits(index), 0);
} finally {
client.delete(index);
.setIndexName(index));) {
assertTrue(client.createIndexIfNeeded(index));
try {
MockRecord<GenericObject> mockRecord = new MockRecord<>();
client.indexDocument(mockRecord, Pair.of("1", "{ \"a\":1}"));
assertEquals(mockRecord.acked, 1);
assertEquals(mockRecord.failed, 0);
assertEquals(client.totalHits(index), 1);

client.deleteDocument(mockRecord, "1");
assertEquals(mockRecord.acked, 2);
assertEquals(mockRecord.failed, 0);
assertEquals(client.totalHits(index), 0);
} finally {
client.delete(index);
}
}
}

@Test
public void testIndexExists() throws IOException {
String index = "mynewindex-" + UUID.randomUUID();
ElasticSearchClient client = new ElasticSearchClient(new ElasticSearchConfig()
try (ElasticSearchClient client = new ElasticSearchClient(new ElasticSearchConfig()
.setElasticSearchUrl("http://" + container.getHttpHostAddress())
.setIndexName(index)
);
assertFalse(client.indexExists(index));
assertTrue(client.createIndexIfNeeded(index));
try {
assertTrue(client.indexExists(index));
assertFalse(client.createIndexIfNeeded(index));
} finally {
client.delete(index);
.setIndexName(index));) {
assertFalse(client.indexExists(index));
assertTrue(client.createIndexIfNeeded(index));
try {
assertTrue(client.indexExists(index));
assertFalse(client.createIndexIfNeeded(index));
} finally {
client.delete(index);
}
}
}

@Test
public void testTopicToIndexName() throws IOException {
ElasticSearchClient client = new ElasticSearchClient(new ElasticSearchConfig()
.setElasticSearchUrl("http://" + container.getHttpHostAddress())
);
assertEquals(client.topicToIndexName("data-ks1.table1"),"data-ks1.table1");
assertEquals(client.topicToIndexName("persistent://public/default/testesjson"), "testesjson");
assertEquals(client.topicToIndexName("default/testesjson"), "testesjson");
assertEquals(client.topicToIndexName(".testesjson"), ".testesjson");
assertEquals(client.topicToIndexName("TEST"), "test");

assertThrows(RuntimeException.class, () -> client.topicToIndexName("toto\\titi"));
assertThrows(RuntimeException.class, () -> client.topicToIndexName("_abc"));
assertThrows(RuntimeException.class, () -> client.topicToIndexName("-abc"));
assertThrows(RuntimeException.class, () -> client.topicToIndexName("+abc"));
try (ElasticSearchClient client = new ElasticSearchClient(new ElasticSearchConfig()
.setElasticSearchUrl("http://" + container.getHttpHostAddress())); ) {
assertEquals(client.topicToIndexName("data-ks1.table1"), "data-ks1.table1");
assertEquals(client.topicToIndexName("persistent://public/default/testesjson"), "testesjson");
assertEquals(client.topicToIndexName("default/testesjson"), "testesjson");
assertEquals(client.topicToIndexName(".testesjson"), ".testesjson");
assertEquals(client.topicToIndexName("TEST"), "test");

assertThrows(RuntimeException.class, () -> client.topicToIndexName("toto\\titi"));
assertThrows(RuntimeException.class, () -> client.topicToIndexName("_abc"));
assertThrows(RuntimeException.class, () -> client.topicToIndexName("-abc"));
assertThrows(RuntimeException.class, () -> client.topicToIndexName("+abc"));
}
}

@Test
Expand All @@ -144,18 +144,19 @@ public void testMalformedDocFails() throws Exception {
.setIndexName(index)
.setBulkEnabled(true)
.setMalformedDocAction(ElasticSearchConfig.MalformedDocAction.FAIL);
ElasticSearchClient client = new ElasticSearchClient(config);
MockRecord<GenericObject> mockRecord = new MockRecord<>();
client.bulkIndex(mockRecord, Pair.of("1","{\"a\":1}"));
client.bulkIndex(mockRecord, Pair.of("2","{\"a\":\"toto\"}"));
client.flush();
assertNotNull(client.irrecoverableError.get());
assertTrue(client.irrecoverableError.get().getMessage().contains("mapper_parsing_exception"));
assertEquals(mockRecord.acked, 1);
assertEquals(mockRecord.failed, 1);
assertThrows(Exception.class, () -> client.bulkIndex(mockRecord, Pair.of("3","{\"a\":3}")));
assertEquals(mockRecord.acked, 1);
assertEquals(mockRecord.failed, 2);
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
MockRecord<GenericObject> mockRecord = new MockRecord<>();
client.bulkIndex(mockRecord, Pair.of("1", "{\"a\":1}"));
client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":\"toto\"}"));
client.flush();
assertNotNull(client.irrecoverableError.get());
assertTrue(client.irrecoverableError.get().getMessage().contains("mapper_parsing_exception"));
assertEquals(mockRecord.acked, 1);
assertEquals(mockRecord.failed, 1);
assertThrows(Exception.class, () -> client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}")));
assertEquals(mockRecord.acked, 1);
assertEquals(mockRecord.failed, 2);
}
}

@Test
Expand All @@ -166,14 +167,15 @@ public void testMalformedDocIgnore() throws Exception {
.setIndexName(index)
.setBulkEnabled(true)
.setMalformedDocAction(ElasticSearchConfig.MalformedDocAction.IGNORE);
ElasticSearchClient client = new ElasticSearchClient(config);
MockRecord<GenericObject> mockRecord = new MockRecord<>();
client.bulkIndex(mockRecord, Pair.of("1","{\"a\":1}"));
client.bulkIndex(mockRecord, Pair.of("2","{\"a\":\"toto\"}"));
client.flush();
assertNull(client.irrecoverableError.get());
assertEquals(mockRecord.acked, 1);
assertEquals(mockRecord.failed, 1);
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
MockRecord<GenericObject> mockRecord = new MockRecord<>();
client.bulkIndex(mockRecord, Pair.of("1", "{\"a\":1}"));
client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":\"toto\"}"));
client.flush();
assertNull(client.irrecoverableError.get());
assertEquals(mockRecord.acked, 1);
assertEquals(mockRecord.failed, 1);
}
}

@Test
Expand All @@ -186,33 +188,35 @@ public void testBulkRetry() throws Exception {
.setMaxRetries(1000)
.setBulkActions(2)
.setRetryBackoffInMs(100)
.setBulkFlushIntervalInMs(10000);
ElasticSearchClient client = new ElasticSearchClient(config);
assertTrue(client.createIndexIfNeeded(index));
try {
MockRecord<GenericObject> mockRecord = new MockRecord<>();
client.bulkIndex(mockRecord, Pair.of("1", "{\"a\":1}"));
client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":2}"));
assertEquals(mockRecord.acked, 2);
assertEquals(mockRecord.failed, 0);
assertEquals(client.totalHits(index), 2);
// disabled, we want to have full control over flush() method
.setBulkFlushIntervalInMs(-1);

ChaosContainer<?> chaosContainer = new ChaosContainer<>(container.getContainerName(), "15s");
chaosContainer.start();
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
try {
assertTrue(client.createIndexIfNeeded(index));
MockRecord<GenericObject> mockRecord = new MockRecord<>();
client.bulkIndex(mockRecord, Pair.of("1", "{\"a\":1}"));
client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":2}"));
assertEquals(mockRecord.acked, 2);
assertEquals(mockRecord.failed, 0);
assertEquals(client.totalHits(index), 2);

client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}"));
Thread.sleep(5000L);
assertEquals(mockRecord.acked, 2);
assertEquals(mockRecord.failed, 0);
assertEquals(client.totalHits(index), 2);
ChaosContainer<?> chaosContainer = new ChaosContainer<>(container.getContainerName(), "15s");
chaosContainer.start();

chaosContainer.stop();
client.flush();
assertEquals(mockRecord.acked, 3);
assertEquals(mockRecord.failed, 0);
assertEquals(client.totalHits(index), 3);
} finally {
client.delete(index);
client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}"));
assertEquals(mockRecord.acked, 2);
assertEquals(mockRecord.failed, 0);
assertEquals(client.totalHits(index), 2);

chaosContainer.stop();
client.flush();
assertEquals(mockRecord.acked, 3);
assertEquals(mockRecord.failed, 0);
assertEquals(client.totalHits(index), 3);
} finally {
client.delete(index);
}
}
}

Expand All @@ -228,49 +232,50 @@ public void testBulkBlocking() throws Exception {
.setBulkConcurrentRequests(2)
.setRetryBackoffInMs(100)
.setBulkFlushIntervalInMs(10000);
ElasticSearchClient client = new ElasticSearchClient(config);
assertTrue(client.createIndexIfNeeded(index));
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
assertTrue(client.createIndexIfNeeded(index));

try {
MockRecord<GenericObject> mockRecord = new MockRecord<>();
for (int i = 1; i <= 5; i++) {
client.bulkIndex(mockRecord, Pair.of(Integer.toString(i), "{\"a\":" + i + "}"));
}
try {
MockRecord<GenericObject> mockRecord = new MockRecord<>();
for (int i = 1; i <= 5; i++) {
client.bulkIndex(mockRecord, Pair.of(Integer.toString(i), "{\"a\":" + i + "}"));
}

Awaitility.await().untilAsserted(() -> {
assertThat("acked record", mockRecord.acked, greaterThanOrEqualTo(4));
assertEquals(mockRecord.failed, 0);
assertThat("totalHits", client.totalHits(index), greaterThanOrEqualTo(4L));
});
client.flush();
Awaitility.await().untilAsserted(() -> {
assertEquals(mockRecord.acked, 5);
Awaitility.await().untilAsserted(() -> {
assertThat("acked record", mockRecord.acked, greaterThanOrEqualTo(4));
assertEquals(mockRecord.failed, 0);
assertThat("totalHits", client.totalHits(index), greaterThanOrEqualTo(4L));
});
client.flush();
Awaitility.await().untilAsserted(() -> {
assertEquals(mockRecord.acked, 5);
assertEquals(mockRecord.failed, 0);
assertEquals(client.totalHits(index), 5);
});

ChaosContainer<?> chaosContainer = new ChaosContainer<>(container.getContainerName(), "30s");
chaosContainer.start();
Thread.sleep(1000L);

// 11th bulkIndex is blocking because we have 2 pending requests, and the 3rd request is blocked.
long start = System.currentTimeMillis();
for (int i = 6; i <= 15; i++) {
client.bulkIndex(mockRecord, Pair.of(Integer.toString(i), "{\"a\":" + i + "}"));
log.info("{} index {}", System.currentTimeMillis(), i);
}
long elapsed = System.currentTimeMillis() - start;
log.info("elapsed = {}", elapsed);
assertTrue(elapsed > 29000); // bulkIndex was blocking while elasticsearch was down or busy

Thread.sleep(1000L);
assertEquals(mockRecord.acked, 15);
assertEquals(mockRecord.failed, 0);
assertEquals(client.totalHits(index), 5);
});

ChaosContainer<?> chaosContainer = new ChaosContainer<>(container.getContainerName(), "30s");
chaosContainer.start();
Thread.sleep(1000L);

// 11th bulkIndex is blocking because we have 2 pending requests, and the 3rd request is blocked.
long start = System.currentTimeMillis();
for (int i = 6; i <= 15; i++) {
client.bulkIndex(mockRecord, Pair.of(Integer.toString(i), "{\"a\":" + i + "}"));
log.info("{} index {}", System.currentTimeMillis(), i);
assertEquals(client.records.size(), 0);

chaosContainer.stop();
} finally {
client.delete(index);
}
long elapsed = System.currentTimeMillis() - start;
log.info("elapsed = {}", elapsed);
assertTrue(elapsed > 29000); // bulkIndex was blocking while elasticsearch was down or busy

Thread.sleep(1000L);
assertEquals(mockRecord.acked, 15);
assertEquals(mockRecord.failed, 0);
assertEquals(client.records.size(), 0);

chaosContainer.stop();
} finally {
client.delete(index);
}
}

Expand Down

0 comments on commit 77fb13e

Please sign in to comment.