Skip to content

Commit

Permalink
GEODE-2402: Write to the lucene region buckets using a callback argument
Browse files Browse the repository at this point in the history
Adding a callback argument when writing to the file and chunk regions.
The file and chunk regions now have a partition listener to
route the put to the correct bucket.

The reason for all of this is that in some cases, the core code can can
send a message that only includes the PR id and the key. We need want
the core to be able to resolve the correct bucket from just those
things, which requires having the PartitionListener that uses the
callback argument.

Added a test of putting to the file and chunk regions during GII, which
is the case where the core code sends a message that includes only the
PR id and the key.
  • Loading branch information
upthewaterspout committed Feb 17, 2017
1 parent a8c6543 commit 8ce8e43
Show file tree
Hide file tree
Showing 19 changed files with 499 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3947,7 +3947,7 @@ public int getDSFID() {
}
}

public static abstract class GIITestHook implements Runnable {
public static abstract class GIITestHook implements Runnable, Serializable {
final private GIITestHookType type;
final private String region_name;
volatile public boolean isRunning;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public Set<Region.Entry> entrySet() {
}

public Set<Region.Entry> entries(boolean recursive) {
return this.proxy.new PREntriesSet(getBucketSet());
return this.proxy.entries(getBucketSet());
}

public Collection values() {
Expand All @@ -101,7 +101,7 @@ public Collection values() {
}

public Set keys() {
return this.proxy.new KeysSet(getBucketSet());
return this.proxy.keySet(getBucketSet());
}

public Set keySet() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,7 @@ LocalizedStrings.LocalRegion_INITIALIZATION_FAILED_FOR_REGION_0, getFullPath()),
return newRegion;
}

public final void create(Object key, Object value, Object aCallbackArgument)
public void create(Object key, Object value, Object aCallbackArgument)
throws TimeoutException, EntryExistsException, CacheWriterException {
long startPut = CachePerfStats.getStatTime();
@Released
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6099,6 +6099,10 @@ public Set entries(boolean recursive) {
return Collections.unmodifiableSet(new PREntriesSet());
}

public Set<Region.Entry> entries(Set<Integer> bucketIds) {
return new PREntriesSet(bucketIds);
}

/**
* Set view of entries. This currently extends the keySet iterator and performs individual
* getEntry() operations using the keys
Expand Down Expand Up @@ -6164,6 +6168,13 @@ public Set keySet(boolean allowTombstones) {
return Collections.unmodifiableSet(new KeysSet(allowTombstones));
}

/**
* Get a keyset of the given buckets
*/
public Set keySet(Set<Integer> bucketSet) {
return new KeysSet(bucketSet);
}

public Set keysWithoutCreatesForTests() {
checkReadiness();
Set<Integer> availableBuckets = new HashSet<Integer>();
Expand Down
2 changes: 2 additions & 0 deletions geode-lucene/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ dependencies {
//Lucene test framework.
testCompile 'org.apache.lucene:lucene-test-framework:' + project.'lucene.version'
testCompile 'org.apache.lucene:lucene-codecs:' + project.'lucene.version'
testCompile 'com.pholser:junit-quickcheck-core:' + project.'junit-quickcheck.version'
testCompile 'com.pholser:junit-quickcheck-generators:' + project.'junit-quickcheck.version'
testCompile files(project(':geode-core').sourceSets.test.output)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
package org.apache.geode.cache.lucene.internal;

import java.io.IOException;
import java.util.Map;

import org.apache.geode.cache.lucene.internal.directory.RegionDirectory;
import org.apache.geode.cache.lucene.internal.partition.BucketTargetingMap;
import org.apache.geode.cache.lucene.internal.repository.IndexRepository;
import org.apache.geode.cache.lucene.internal.repository.IndexRepositoryImpl;
import org.apache.geode.cache.lucene.internal.repository.serializer.LuceneSerializer;
Expand All @@ -41,8 +43,11 @@ public IndexRepository computeIndexRepository(final Integer bucketId, LuceneSeri
LuceneIndexImpl index, PartitionedRegion userRegion, final IndexRepository oldRepository)
throws IOException {
LuceneIndexForPartitionedRegion indexForPR = (LuceneIndexForPartitionedRegion) index;
BucketRegion fileBucket = getMatchingBucket(indexForPR.getFileRegion(), bucketId);
BucketRegion chunkBucket = getMatchingBucket(indexForPR.getChunkRegion(), bucketId);
final PartitionedRegion fileRegion = indexForPR.getFileRegion();
final PartitionedRegion chunkRegion = indexForPR.getChunkRegion();

BucketRegion fileBucket = getMatchingBucket(fileRegion, bucketId);
BucketRegion chunkBucket = getMatchingBucket(chunkRegion, bucketId);
BucketRegion dataBucket = getMatchingBucket(userRegion, bucketId);
boolean success = false;
if (fileBucket == null || chunkBucket == null) {
Expand All @@ -51,7 +56,7 @@ public IndexRepository computeIndexRepository(final Integer bucketId, LuceneSeri
}
return null;
}
if (!fileBucket.getBucketAdvisor().isPrimary()) {
if (!chunkBucket.getBucketAdvisor().isPrimary()) {
if (oldRepository != null) {
oldRepository.cleanup();
}
Expand All @@ -68,15 +73,15 @@ public IndexRepository computeIndexRepository(final Integer bucketId, LuceneSeri
DistributedLockService lockService = getLockService();
String lockName = getLockName(bucketId, fileBucket);
while (!lockService.lock(lockName, 100, -1)) {
if (!fileBucket.getBucketAdvisor().isPrimary()) {
if (!chunkBucket.getBucketAdvisor().isPrimary()) {
return null;
}
}

final IndexRepository repo;
try {
RegionDirectory dir =
new RegionDirectory(fileBucket, chunkBucket, indexForPR.getFileSystemStats());
RegionDirectory dir = new RegionDirectory(getBucketTargetingMap(fileBucket, bucketId),
getBucketTargetingMap(chunkBucket, bucketId), indexForPR.getFileSystemStats());
IndexWriterConfig config = new IndexWriterConfig(indexForPR.getAnalyzer());
IndexWriter writer = new IndexWriter(dir, config);
repo = new IndexRepositoryImpl(fileBucket, writer, serializer, indexForPR.getIndexStats(),
Expand All @@ -95,8 +100,12 @@ public IndexRepository computeIndexRepository(final Integer bucketId, LuceneSeri

}

private Map getBucketTargetingMap(BucketRegion region, int bucketId) {
return new BucketTargetingMap(region, bucketId);
}

private String getLockName(final Integer bucketId, final BucketRegion fileBucket) {
return FILE_REGION_LOCK_FOR_BUCKET_ID + fileBucket.getFullPath() + bucketId;
return FILE_REGION_LOCK_FOR_BUCKET_ID + fileBucket.getFullPath();
}

private DistributedLockService getLockService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.FixedPartitionResolver;
import org.apache.geode.cache.PartitionAttributes;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.PartitionResolver;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.RegionShortcut;
Expand All @@ -28,6 +30,8 @@
import org.apache.geode.cache.lucene.internal.filesystem.ChunkKey;
import org.apache.geode.cache.lucene.internal.filesystem.File;
import org.apache.geode.cache.lucene.internal.filesystem.FileSystemStats;
import org.apache.geode.cache.lucene.internal.partition.BucketTargetingFixedResolver;
import org.apache.geode.cache.lucene.internal.partition.BucketTargetingResolver;
import org.apache.geode.cache.lucene.internal.repository.RepositoryManager;
import org.apache.geode.cache.lucene.internal.repository.serializer.HeterogeneousLuceneSerializer;
import org.apache.geode.cache.partition.PartitionListener;
Expand Down Expand Up @@ -143,9 +147,18 @@ private PartitionAttributesFactory configureLuceneRegionAttributesFactory(
PartitionAttributes<?, ?> dataRegionAttributes) {
attributesFactory.setTotalNumBuckets(dataRegionAttributes.getTotalNumBuckets());
attributesFactory.setRedundantCopies(dataRegionAttributes.getRedundantCopies());
attributesFactory.setPartitionResolver(getPartitionResolver(dataRegionAttributes));
return attributesFactory;
}

private PartitionResolver getPartitionResolver(PartitionAttributes dataRegionAttributes) {
if (dataRegionAttributes.getPartitionResolver() instanceof FixedPartitionResolver) {
return new BucketTargetingFixedResolver();
} else {
return new BucketTargetingResolver();
}
}

protected <K, V> Region<K, V> createRegion(final String regionName,
final RegionShortcut regionShortCut, final String colocatedWithRegionName,
final PartitionAttributes partitionAttributes, final RegionAttributes regionAttributes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,23 @@

package org.apache.geode.cache.lucene.internal.directory;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;

import org.apache.geode.cache.lucene.internal.filesystem.ChunkKey;
import org.apache.geode.cache.lucene.internal.filesystem.File;
import org.apache.geode.cache.lucene.internal.filesystem.FileSystem;
import org.apache.geode.cache.lucene.internal.filesystem.FileSystemStats;
import org.apache.lucene.store.BaseDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.apache.lucene.store.SingleInstanceLockFactory;

import org.apache.geode.cache.lucene.internal.filesystem.ChunkKey;
import org.apache.geode.cache.lucene.internal.filesystem.File;
import org.apache.geode.cache.lucene.internal.filesystem.FileSystem;
import org.apache.geode.cache.lucene.internal.filesystem.FileSystemStats;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;

/**
* An implementation of Directory that stores data in geode regions.
Expand All @@ -49,8 +47,8 @@ public class RegionDirectory extends BaseDirectory {
* Create a region directory with a given file and chunk region. These regions may be bucket
* regions or they may be replicated regions.
*/
public RegionDirectory(ConcurrentMap<String, File> fileRegion,
ConcurrentMap<ChunkKey, byte[]> chunkRegion, FileSystemStats stats) {
public RegionDirectory(Map<String, File> fileRegion, Map<ChunkKey, byte[]> chunkRegion,
FileSystemStats stats) {
super(new SingleInstanceLockFactory());
fs = new FileSystem(fileRegion, chunkRegion, stats);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;

/**
Expand All @@ -38,8 +39,8 @@
public class FileSystem {
private static final Logger logger = LogService.getLogger();

private final ConcurrentMap<String, File> fileRegion;
private final ConcurrentMap<ChunkKey, byte[]> chunkRegion;
private final Map<String, File> fileRegion;
private final Map<ChunkKey, byte[]> chunkRegion;

static final int CHUNK_SIZE = 1024 * 1024; // 1 MB
private final FileSystemStats stats;
Expand All @@ -54,8 +55,8 @@ public class FileSystem {
* @param fileRegion the region to store metadata about the files
* @param chunkRegion the region to store actual file data.
*/
public FileSystem(ConcurrentMap<String, File> fileRegion,
ConcurrentMap<ChunkKey, byte[]> chunkRegion, FileSystemStats stats) {
public FileSystem(Map<String, File> fileRegion, Map<ChunkKey, byte[]> chunkRegion,
FileSystemStats stats) {
this.fileRegion = fileRegion;
this.chunkRegion = chunkRegion;
this.stats = stats;
Expand Down Expand Up @@ -188,11 +189,11 @@ void updateFile(File file) {
fileRegion.put(file.getName(), file);
}

public ConcurrentMap<String, File> getFileRegion() {
public Map<String, File> getFileRegion() {
return fileRegion;
}

public ConcurrentMap<ChunkKey, byte[]> getChunkRegion() {
public Map<ChunkKey, byte[]> getChunkRegion() {
return chunkRegion;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.lucene.internal.partition;

import org.apache.geode.cache.EntryOperation;
import org.apache.geode.cache.FixedPartitionAttributes;
import org.apache.geode.cache.FixedPartitionResolver;
import org.apache.geode.cache.PartitionResolver;
import org.apache.geode.internal.cache.FixedPartitionAttributesImpl;
import org.apache.geode.internal.cache.PartitionedRegion;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;

/**
* A partition resolver that expects the actual bucket id to be the callback argument of all
* operations. The partition resolver reverse engineers the fixed partition name and bucket number
* from the target partitioning.
*
* This is a bit messy, mostly because there's no good way to get the FixedPartition from the actual
* bucket id without iterating over all of the fixed partitions.
*/
public class BucketTargetingFixedResolver implements FixedPartitionResolver {

@Override
public Object getRoutingObject(final EntryOperation opDetails) {
int targetBucketId = (Integer) opDetails.getCallbackArgument();
final Map.Entry<String, Integer[]> targetPartition = getFixedPartition(opDetails);

return targetBucketId - targetPartition.getValue()[0];
}

@Override
public String getName() {
return getClass().getName();
}

@Override
public void close() {

}

@Override
public String getPartitionName(final EntryOperation opDetails,
@Deprecated final Set targetPartitions) {
final Map.Entry<String, Integer[]> targetPartition = getFixedPartition(opDetails);
return targetPartition.getKey();
}

protected Map.Entry<String, Integer[]> getFixedPartition(final EntryOperation opDetails) {
PartitionedRegion region = (PartitionedRegion) opDetails.getRegion();
int targetBucketId = (Integer) opDetails.getCallbackArgument();
Map<String, Integer[]> partitions = region.getPartitionsMap();

return partitions.entrySet().stream().filter(entry -> withinPartition(entry, targetBucketId))
.findFirst().get();
}

private boolean withinPartition(Map.Entry<String, Integer[]> entry, int bucketId) {
int startingBucket = entry.getValue()[0];
int endingBucket = startingBucket + entry.getValue()[1];
return startingBucket <= bucketId && bucketId < endingBucket;
}
}
Loading

0 comments on commit 8ce8e43

Please sign in to comment.