Skip to content

Commit

Permalink
GEODE-8536: Allow limited retries when creating Lucene IndexWriter (a…
Browse files Browse the repository at this point in the history
…pache#5553)

Authored-by: Donal Evans <[email protected]>
  • Loading branch information
DonalEvans authored Oct 3, 2020
1 parent ae0d6bc commit eccd4f0
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import junitparams.Parameters;
import org.apache.commons.lang3.RandomStringUtils;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.Before;
Expand Down Expand Up @@ -111,7 +110,6 @@ private BucketRegion getFileAndChunkBucket() {
}

@Test
@Parameters()
public void lockedBucketShouldPreventPrimaryFromMoving() {
dataStore1.invoke(this::initDataStoreAndLuceneIndex);
dataStore1.invoke(() -> LuceneTestUtilities.pauseSender(getCache()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.lucene.internal;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;

import java.io.IOException;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import org.apache.geode.InternalGemFireError;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.lucene.LuceneQuery;
import org.apache.geode.cache.lucene.LuceneQueryException;
import org.apache.geode.cache.lucene.LuceneServiceProvider;
import org.apache.geode.cache.lucene.test.TestObject;
import org.apache.geode.internal.cache.PartitionedRegion;

public class IndexRepositoryFactoryIntegrationTest {
public Cache cache;
public static final String INDEX_NAME = "testIndex";
public static final String REGION_NAME = "testRegion";
private IndexRepositoryFactory spyFactory;
private LuceneQuery<Object, Object> luceneQuery;

@Before
public void setUp() {
cache = new CacheFactory().create();
LuceneServiceProvider.get(cache).createIndexFactory().setFields("field1", "field2")
.create(INDEX_NAME, REGION_NAME);

Region<Object, Object> dataRegion =
cache.createRegionFactory(RegionShortcut.PARTITION).create(REGION_NAME);

dataRegion.put("A", new TestObject());

spyFactory = spy(new IndexRepositoryFactory());
PartitionedRepositoryManager.indexRepositoryFactory = spyFactory;

luceneQuery = LuceneServiceProvider.get(cache).createLuceneQueryFactory()
.create(INDEX_NAME, REGION_NAME, "hello", "field1");
}

@After
public void tearDown() {
if (cache != null) {
cache.close();
}
}

@Test
public void shouldRetryWhenIOExceptionEncounteredOnceDuringComputingRepository()
throws IOException, LuceneQueryException {
// To ensure that the specific bucket used in the query throws the IOException to trigger the
// retry, throw once for every bucket in the region
int timesToThrow = ((PartitionedRegion) cache.getRegion(REGION_NAME)).getTotalNumberOfBuckets();

doAnswer(new Answer<Object>() {
private int times = 0;

@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
if (times < timesToThrow) {
times++;
throw new IOException();
}
return invocation.callRealMethod();
}
}).when(spyFactory).getIndexWriter(any(), any());

luceneQuery.findKeys();
}

@Test
public void shouldThrowInternalGemfireErrorWhenIOExceptionEncounteredConsistentlyDuringComputingRepository()
throws IOException {
doThrow(new IOException()).when(spyFactory).getIndexWriter(any(), any());

assertThatThrownBy(luceneQuery::findKeys).isInstanceOf(FunctionException.class)
.hasCauseInstanceOf(InternalGemFireError.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class IndexRepositoryFactory {
private static final Logger logger = LogService.getLogger();
public static final String FILE_REGION_LOCK_FOR_BUCKET_ID = "FileRegionLockForBucketId:";
public static final String APACHE_GEODE_INDEX_COMPLETE = "APACHE_GEODE_INDEX_COMPLETE";
protected static final int GET_INDEX_WRITER_MAX_ATTEMPTS = 200;

public IndexRepositoryFactory() {}

Expand Down Expand Up @@ -74,7 +75,8 @@ public IndexRepository computeIndexRepository(final Integer bucketId, LuceneSeri
* This is a util function just to not let computeIndexRepository be a huge chunk of code.
*/
protected IndexRepository finishComputingRepository(Integer bucketId, LuceneSerializer serializer,
PartitionedRegion userRegion, IndexRepository oldRepository, InternalLuceneIndex index) {
PartitionedRegion userRegion, IndexRepository oldRepository, InternalLuceneIndex index)
throws IOException {
LuceneIndexForPartitionedRegion indexForPR = (LuceneIndexForPartitionedRegion) index;
final PartitionedRegion fileRegion = indexForPR.getFileAndChunkRegion();
BucketRegion fileAndChunkBucket = getMatchingBucket(fileRegion, bucketId);
Expand Down Expand Up @@ -129,7 +131,7 @@ protected IndexRepository finishComputingRepository(Integer bucketId, LuceneSeri
} catch (IOException e) {
logger.warn("Exception thrown while constructing Lucene Index for bucket:" + bucketId
+ " for file region:" + fileAndChunkBucket.getFullPath(), e);
return null;
throw e;
} catch (CacheClosedException e) {
logger.info("CacheClosedException thrown while constructing Lucene Index for bucket:"
+ bucketId + " for file region:" + fileAndChunkBucket.getFullPath());
Expand All @@ -144,11 +146,34 @@ protected IndexRepository finishComputingRepository(Integer bucketId, LuceneSeri

protected IndexWriter buildIndexWriter(int bucketId, BucketRegion fileAndChunkBucket,
LuceneIndexForPartitionedRegion indexForPR) throws IOException {
// bucketTargetingMap handles partition resolver (via bucketId as callbackArg)
Map bucketTargetingMap = getBucketTargetingMap(fileAndChunkBucket, bucketId);
RegionDirectory dir = new RegionDirectory(bucketTargetingMap, indexForPR.getFileSystemStats());
IndexWriterConfig config = new IndexWriterConfig(indexForPR.getAnalyzer());
int attempts = 0;
// IOExceptions can occur if the fileAndChunk region is being modified while the IndexWriter is
// being initialized, so allow limited retries here to account for that timing window
while (true) {
// bucketTargetingMap handles partition resolver (via bucketId as callbackArg)
Map<Object, Object> bucketTargetingMap = getBucketTargetingMap(fileAndChunkBucket, bucketId);
RegionDirectory dir =
new RegionDirectory(bucketTargetingMap, indexForPR.getFileSystemStats());
IndexWriterConfig config = new IndexWriterConfig(indexForPR.getAnalyzer());
try {
attempts++;
return getIndexWriter(dir, config);
} catch (IOException e) {
if (attempts >= GET_INDEX_WRITER_MAX_ATTEMPTS) {
throw e;
}
logger.info("Encountered {} while attempting to get IndexWriter for index {}. Retrying...",
e, indexForPR.getName());
try {
Thread.sleep(5);
} catch (InterruptedException ignore) {
}
}
}
}

protected IndexWriter getIndexWriter(RegionDirectory dir, IndexWriterConfig config)
throws IOException {
return new IndexWriter(dir, config);
}

Expand Down Expand Up @@ -186,8 +211,8 @@ private Object getValue(Region.Entry entry) {
return value;
}

protected Map getBucketTargetingMap(BucketRegion region, int bucketId) {
return new BucketTargetingMap(region, bucketId);
protected Map<Object, Object> getBucketTargetingMap(BucketRegion region, int bucketId) {
return new BucketTargetingMap<>(region, bucketId);
}

protected String getLockName(final BucketRegion fileAndChunkBucket) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package org.apache.geode.cache.lucene.internal;

import static org.apache.geode.cache.lucene.internal.IndexRepositoryFactory.GET_INDEX_WRITER_MAX_ATTEMPTS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -22,11 +23,13 @@
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.IOException;

import org.apache.lucene.index.IndexWriter;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -77,7 +80,8 @@ public void setUp() {
}

@Test
public void finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFileAndChunkBucketIsNull() {
public void finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFileAndChunkBucketIsNull()
throws IOException {
doReturn(null).when(indexRepositoryFactory).getMatchingBucket(fileRegion, bucketId);

IndexRepository indexRepository = indexRepositoryFactory.finishComputingRepository(0,
Expand All @@ -87,7 +91,8 @@ public void finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFi
}

@Test
public void finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFileAndChunkBucketIsNotPrimary() {
public void finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFileAndChunkBucketIsNotPrimary()
throws IOException {
when(fileAndChunkBucketAdvisor.isPrimary()).thenReturn(false);

IndexRepository indexRepository = indexRepositoryFactory.finishComputingRepository(0,
Expand All @@ -97,7 +102,8 @@ public void finishComputingRepositoryShouldReturnNullAndCleanOldRepositoryWhenFi
}

@Test
public void finishComputingRepositoryShouldReturnOldRepositoryWhenNotNullAndNotClosed() {
public void finishComputingRepositoryShouldReturnOldRepositoryWhenNotNullAndNotClosed()
throws IOException {
when(oldRepository.isClosed()).thenReturn(false);
when(fileAndChunkBucketAdvisor.isPrimary()).thenReturn(true);

Expand All @@ -108,7 +114,8 @@ public void finishComputingRepositoryShouldReturnOldRepositoryWhenNotNullAndNotC
}

@Test
public void finishComputingRepositoryShouldReturnNullWhenLockCanNotBeAcquiredAndFileAndChunkBucketIsNotPrimary() {
public void finishComputingRepositoryShouldReturnNullWhenLockCanNotBeAcquiredAndFileAndChunkBucketIsNotPrimary()
throws IOException {
when(oldRepository.isClosed()).thenReturn(true);
when(fileAndChunkBucketAdvisor.isPrimary()).thenReturn(true).thenReturn(false);
when(distributedLockService.lock(any(), anyLong(), anyLong())).thenReturn(false);
Expand All @@ -119,17 +126,16 @@ public void finishComputingRepositoryShouldReturnNullWhenLockCanNotBeAcquiredAnd
}

@Test
public void finishComputingRepositoryShouldReturnNullAndReleaseLockWhenIOExceptionIsThrownWhileBuildingTheIndex()
public void finishComputingRepositoryShouldThrowExceptionAndReleaseLockWhenIOExceptionIsThrownWhileBuildingTheIndex()
throws IOException {
when(oldRepository.isClosed()).thenReturn(true);
when(fileAndChunkBucketAdvisor.isPrimary()).thenReturn(true);
when(distributedLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
doThrow(new IOException("Test Exception")).when(indexRepositoryFactory)
.buildIndexWriter(bucketId, fileAndChunkBucket, luceneIndex);

IndexRepository indexRepository = indexRepositoryFactory.finishComputingRepository(0,
serializer, userRegion, oldRepository, luceneIndex);
assertThat(indexRepository).isNull();
assertThatThrownBy(() -> indexRepositoryFactory.finishComputingRepository(0,
serializer, userRegion, oldRepository, luceneIndex)).isInstanceOf(IOException.class);
verify(distributedLockService).unlock(any());
}

Expand All @@ -146,4 +152,27 @@ public void finishComputingRepositoryShouldThrowExceptionAndReleaseLockWhenCache
userRegion, oldRepository, luceneIndex)).isInstanceOf(CacheClosedException.class);
verify(distributedLockService).unlock(any());
}

@Test
public void buildIndexWriterRetriesCreatingIndexWriterWhenIOExceptionEncountered()
throws IOException {
IndexWriter writer = mock(IndexWriter.class);
doThrow(new IOException()).doReturn(writer).when(indexRepositoryFactory).getIndexWriter(any(),
any());
assertThat(indexRepositoryFactory.buildIndexWriter(bucketId, fileAndChunkBucket, luceneIndex))
.isEqualTo(writer);
verify(indexRepositoryFactory, times(2)).getIndexWriter(any(), any());
}

@Test
public void buildIndexWriterThrowsExceptionWhenIOExceptionConsistentlyEncountered()
throws IOException {
IOException testException = new IOException("Test exception");
doThrow(testException).when(indexRepositoryFactory).getIndexWriter(any(), any());
assertThatThrownBy(
() -> indexRepositoryFactory.buildIndexWriter(bucketId, fileAndChunkBucket, luceneIndex))
.isEqualTo(testException);
verify(indexRepositoryFactory, times(GET_INDEX_WRITER_MAX_ATTEMPTS)).getIndexWriter(any(),
any());
}
}

0 comments on commit eccd4f0

Please sign in to comment.