Skip to content

Commit

Permalink
Beef up 'indexRandom' by running index requests concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
s1monw committed Sep 16, 2013
1 parent 8f7e3c8 commit a6f14eb
Showing 1 changed file with 69 additions and 8 deletions.
77 changes: 69 additions & 8 deletions src/test/java/org/elasticsearch/AbstractSharedClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.base.Joiner;
import com.google.common.collect.Iterators;
import org.apache.lucene.util.AbstractRandomizedTest.IntegrationTests;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ShardOperationFailedException;
Expand Down Expand Up @@ -55,10 +56,13 @@
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndexTemplateMissingException;
import org.elasticsearch.rest.RestStatus;
import org.hamcrest.Matchers;
import org.junit.*;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
Expand Down Expand Up @@ -396,22 +400,79 @@ public void indexRandom(String index, boolean forceRefresh, IndexRequestBuilder.
Random random = getRandom();
List<IndexRequestBuilder> list = Arrays.asList(builders);
Collections.shuffle(list, random);
for (IndexRequestBuilder indexRequestBuilder : list) {
indexRequestBuilder.execute().actionGet();
if (rarely()) {
final CopyOnWriteArrayList<Throwable> errors = new CopyOnWriteArrayList<Throwable>();
List<CountDownLatch> latches = new ArrayList<CountDownLatch>();
if (frequently()) {
logger.info("Index [{}] docs async: [{}]", list.size(), true);
final CountDownLatch latch = new CountDownLatch(list.size());
latches.add(latch);
for (IndexRequestBuilder indexRequestBuilder : list) {
indexRequestBuilder.execute(new LatchedActionListener<IndexResponse>(latch, errors));
if (rarely()) {
client().admin().indices().prepareRefresh(index).execute().get();
} else if (rarely()) {
client().admin().indices().prepareFlush(index).execute().get();
} else if (rarely()) {
client().admin().indices().prepareOptimize(index).setMaxNumSegments(between(1, 10)).setFlush(random.nextBoolean()).execute().get();
if (rarely()) {
client().admin().indices().prepareRefresh(index).execute(new LatchedActionListener<RefreshResponse>(newLatch(latches), errors));
} else if (rarely()) {
client().admin().indices().prepareFlush(index).execute(new LatchedActionListener<FlushResponse>(newLatch(latches), errors));
} else if (rarely()) {
client().admin().indices().prepareOptimize(index).setMaxNumSegments(between(1, 10)).setFlush(random.nextBoolean()).execute(new LatchedActionListener<OptimizeResponse>(newLatch(latches), errors));
}
}
}

} else {
logger.info("Index [{}] docs async: [{}]", list.size(), false);
for (IndexRequestBuilder indexRequestBuilder : list) {
indexRequestBuilder.execute().actionGet();
if (rarely()) {
if (rarely()) {
client().admin().indices().prepareRefresh(index).execute(new LatchedActionListener<RefreshResponse>(newLatch(latches), errors));
} else if (rarely()) {
client().admin().indices().prepareFlush(index).execute(new LatchedActionListener<FlushResponse>(newLatch(latches), errors));
} else if (rarely()) {
client().admin().indices().prepareOptimize(index).setMaxNumSegments(between(1, 10)).setFlush(random.nextBoolean()).execute(new LatchedActionListener<OptimizeResponse>(newLatch(latches), errors));
}
}
}
}
for (CountDownLatch countDownLatch : latches) {
countDownLatch.await();
}
assertThat(errors, Matchers.emptyIterable());
if (forceRefresh) {
assertNoFailures(client().admin().indices().prepareRefresh(index).execute().get());
}
}

private static final CountDownLatch newLatch(List<CountDownLatch> latches) {
CountDownLatch l = new CountDownLatch(1);
latches.add(l);
return l;
}

private static class LatchedActionListener<Response> implements ActionListener<Response> {
private final CountDownLatch latch;
private final CopyOnWriteArrayList<Throwable> errors;

public LatchedActionListener(CountDownLatch latch, CopyOnWriteArrayList<Throwable> errors) {
this.latch = latch;
this.errors = errors;
}

@Override
public void onResponse(Response response) {
latch.countDown();
}

@Override
public void onFailure(Throwable e) {
try {
errors.add(e);
} finally {
latch.countDown();
}
}

}

public void clearScroll(String... scrollIds) {
ClearScrollResponse clearResponse = client().prepareClearScroll()
Expand Down

0 comments on commit a6f14eb

Please sign in to comment.