Skip to content

Commit

Permalink
GEODE-2404: Added support for destroying lucene indexes
Browse files Browse the repository at this point in the history
  • Loading branch information
boglesby committed Feb 27, 2017
1 parent c4a5ab2 commit 11521a8
Show file tree
Hide file tree
Showing 22 changed files with 1,068 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.geode.cache.wan.GatewayEventSubstitutionFilter;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
Expand Down Expand Up @@ -191,8 +192,16 @@ public boolean isMetaQueue() {
return ((AbstractGatewaySender) sender).getIsMetaQueue();
}

public void stop() {
if (this.sender.isRunning()) {
this.sender.stop();
}
}

public void destroy() {
GemFireCacheImpl gfci = (GemFireCacheImpl) ((AbstractGatewaySender) this.sender).getCache();
this.sender.destroy();
gfci.removeAsyncEventQueue(this);
}

public boolean isBucketSorted() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -809,8 +809,8 @@ public interface DataSerializableFixedID extends SerializationVersions {
public static final short LUCENE_ENTRY_SCORE = 2174;
public static final short LUCENE_TOP_ENTRIES = 2175;
public static final short LUCENE_TOP_ENTRIES_COLLECTOR = 2176;

public static final short WAIT_UNTIL_FLUSHED_FUNCTION_CONTEXT = 2177;
public static final short DESTROY_LUCENE_INDEX_MESSAGE = 2178;

// NOTE, codes > 65535 will take 4 bytes to serialize

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7674,6 +7674,10 @@ public class LocalizedStrings {
new StringId(6650,
"Caught the following exception attempting waitUntilFlushed and will return:");

public static final StringId LuceneService_INDEX_0_NOT_FOUND_IN_REGION_1 =
new StringId(6651, "Lucene index {0} was not found in region {1}.");
public static final StringId LuceneService_DESTROYED_INDEX_0_FROM_REGION_1 =
new StringId(6652, "Destroyed Lucene index {0} from region {1}.");
/** Testing strings, messageId 90000-99999 **/

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,18 @@ public void createIndex(String indexName, String regionPath,

/**
* Destroy the lucene index
*
* @param index index object
*
* @param indexName the name of the index to destroy
* @param regionPath the path of the region whose index to destroy
*/
public void destroyIndex(String indexName, String regionPath);

/**
* Destroy all the lucene indexes for the region
*
* @param regionPath The path of the region on which to destroy the indexes
*/
public void destroyIndex(LuceneIndex index);
public void destroyIndexes(String regionPath);

/**
* Get the lucene index object specified by region name and index name
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.lucene.internal;

import org.apache.geode.DataSerializer;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.lucene.LuceneServiceProvider;
import org.apache.geode.distributed.internal.*;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.logging.LogService;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collection;

public class DestroyLuceneIndexMessage extends PooledDistributionMessage
implements MessageWithReply {

private int processorId;

private String regionPath;

private String indexName;

private static final Logger logger = LogService.getLogger();

/* For serialization */
public DestroyLuceneIndexMessage() {}

protected DestroyLuceneIndexMessage(Collection recipients, int processorId, String regionPath,
String indexName) {
super();
setRecipients(recipients);
this.processorId = processorId;
this.regionPath = regionPath;
this.indexName = indexName;
}

@Override
protected void process(DistributionManager dm) {
ReplyException replyException = null;
try {
if (logger.isDebugEnabled()) {
logger.debug("DestroyLuceneIndexMessage: Destroying regionPath=" + this.regionPath
+ "; indexName=" + this.indexName);
}
try {
Cache cache = GemFireCacheImpl.getInstance();
LuceneServiceImpl impl = (LuceneServiceImpl) LuceneServiceProvider.get(cache);
impl.destroyIndex(this.indexName, this.regionPath, false);
if (logger.isDebugEnabled()) {
logger.debug("DestroyLuceneIndexMessage: Destroyed regionPath=" + this.regionPath
+ "; indexName=" + this.indexName);
}
} catch (Throwable e) {
replyException = new ReplyException(e);
if (logger.isDebugEnabled()) {
logger.debug(
"DestroyLuceneIndexMessage: Caught the following exception attempting to destroy indexName="
+ this.indexName + "; regionPath=" + this.regionPath + ":",
e);
}
}
} finally {
ReplyMessage replyMsg = new ReplyMessage();
replyMsg.setRecipient(getSender());
replyMsg.setProcessorId(this.processorId);
if (replyException != null) {
replyMsg.setException(replyException);
}
dm.putOutgoing(replyMsg);
}
}

@Override
public int getDSFID() {
return DESTROY_LUCENE_INDEX_MESSAGE;
}

@Override
public void toData(DataOutput out) throws IOException {
super.toData(out);
out.writeInt(this.processorId);
DataSerializer.writeString(this.regionPath, out);
DataSerializer.writeString(this.indexName, out);
}

@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
super.fromData(in);
this.processorId = in.readInt();
this.regionPath = DataSerializer.readString(in);
this.indexName = DataSerializer.readString(in);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,9 @@ public interface InternalLuceneIndex extends LuceneIndex {
*/
public void dumpFiles(String directory);

/**
* Destroy the index
*/
public void destroy(boolean initiator);

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package org.apache.geode.cache.lucene.internal;

import org.apache.geode.CancelException;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.FixedPartitionResolver;
Expand All @@ -24,6 +25,7 @@
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.cache.lucene.internal.directory.DumpDirectoryFiles;
Expand All @@ -36,15 +38,23 @@
import org.apache.geode.cache.lucene.internal.repository.serializer.HeterogeneousLuceneSerializer;
import org.apache.geode.cache.partition.PartitionListener;
import org.apache.geode.distributed.internal.DM;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.PartitionedRegion;

import java.util.Set;

/* wrapper of IndexWriter */
public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl {
protected Region<String, File> fileRegion;
protected Region<ChunkKey, byte[]> chunkRegion;
protected final FileSystemStats fileSystemStats;

public static final String FILES_REGION_SUFFIX = ".files";
public static final String CHUNKS_REGION_SUFFIX = ".chunks";

public LuceneIndexForPartitionedRegion(String indexName, String regionPath, Cache cache) {
super(indexName, regionPath, cache);

Expand Down Expand Up @@ -123,7 +133,7 @@ Region createFileRegion(final RegionShortcut regionShortCut, final String fileRe
}

public String createFileRegionName() {
return LuceneServiceImpl.getUniqueIndexName(indexName, regionPath) + ".files";
return LuceneServiceImpl.getUniqueIndexRegionName(indexName, regionPath, FILES_REGION_SUFFIX);
}

boolean chunkRegionExists(String chunkRegionName) {
Expand All @@ -139,7 +149,7 @@ Region<ChunkKey, byte[]> createChunkRegion(final RegionShortcut regionShortCut,
}

public String createChunkRegionName() {
return LuceneServiceImpl.getUniqueIndexName(indexName, regionPath) + ".chunks";
return LuceneServiceImpl.getUniqueIndexRegionName(indexName, regionPath, CHUNKS_REGION_SUFFIX);
}

private PartitionAttributesFactory configureLuceneRegionAttributesFactory(
Expand Down Expand Up @@ -192,4 +202,99 @@ public void dumpFiles(final String directory) {
.withArgs(new String[] {directory, indexName}).execute(DumpDirectoryFiles.ID);
results.getResult();
}

@Override
public void destroy(boolean initiator) {
if (logger.isDebugEnabled()) {
logger.debug("Destroying index regionPath=" + regionPath + "; indexName=" + indexName
+ "; initiator=" + initiator);
}

// Invoke super destroy to remove the extension
super.destroy(initiator);

// Destroy the AsyncEventQueue
PartitionedRegion pr = (PartitionedRegion) getDataRegion();
destroyAsyncEventQueue(pr);

// Destroy the chunk region (colocated with the file region)
// localDestroyRegion can't be used because locally destroying regions is not supported on
// colocated regions
if (!chunkRegion.isDestroyed()) {
chunkRegion.destroyRegion();
if (logger.isDebugEnabled()) {
logger.debug("Destroyed chunkRegion=" + chunkRegion.getName());
}
}

// Destroy the file region (colocated with the application region)
// localDestroyRegion can't be used because locally destroying regions is not supported on
// colocated regions
if (!fileRegion.isDestroyed()) {
fileRegion.destroyRegion();
if (logger.isDebugEnabled()) {
logger.debug("Destroyed fileRegion=" + fileRegion.getName());
}
}

// Destroy index on remote members if necessary
if (initiator) {
destroyOnRemoteMembers(pr);
}

if (logger.isDebugEnabled()) {
logger.debug("Destroyed index regionPath=" + regionPath + "; indexName=" + indexName
+ "; initiator=" + initiator);
}
}

private void destroyAsyncEventQueue(PartitionedRegion pr) {
String aeqId = LuceneServiceImpl.getUniqueIndexName(indexName, regionPath);

// Get the AsyncEventQueue
AsyncEventQueueImpl aeq = (AsyncEventQueueImpl) cache.getAsyncEventQueue(aeqId);

// Stop the AsyncEventQueue (this stops the AsyncEventQueue's underlying GatewaySender)
aeq.stop();

// Remove the id from the dataRegion's AsyncEventQueue ids
// Note: The region may already have been destroyed by a remote member
if (!pr.isDestroyed()) {
pr.getAttributesMutator().removeAsyncEventQueueId(aeqId);
}

// Destroy the aeq (this also removes it from the GemFireCacheImpl)
aeq.destroy();
if (logger.isDebugEnabled()) {
logger.debug("Destroyed aeqId=" + aeqId);
}
}

private void destroyOnRemoteMembers(PartitionedRegion pr) {
DM dm = pr.getDistributionManager();
Set<InternalDistributedMember> recipients = pr.getRegionAdvisor().adviseDataStore();
if (!recipients.isEmpty()) {
if (logger.isDebugEnabled()) {
logger.debug("LuceneIndexForPartitionedRegion: About to send destroy message recipients="
+ recipients);
}
ReplyProcessor21 processor = new ReplyProcessor21(dm, recipients);
DestroyLuceneIndexMessage message = new DestroyLuceneIndexMessage(recipients,
processor.getProcessorId(), regionPath, indexName);
dm.putOutgoing(message);
if (logger.isDebugEnabled()) {
logger.debug("LuceneIndexForPartitionedRegion: Sent message recipients=" + recipients);
}
try {
processor.waitForReplies();
} catch (ReplyException e) {
if (!(e.getCause() instanceof CancelException)) {
throw e;
}
} catch (InterruptedException e) {
dm.getCancelCriterion().checkCancelInProgress(e);
Thread.currentThread().interrupt();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.geode.internal.cache.extension.Extension;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
Expand Down Expand Up @@ -203,6 +204,21 @@ protected void addExtension(LocalRegion dataRegion) {
dataRegion.getExtensionPoint().addExtension(creation);
}

public void destroy(boolean initiator) {
// Find and delete the appropriate extension
Extension extensionToDelete = null;
for (Extension extension : getDataRegion().getExtensionPoint().getExtensions()) {
LuceneIndexCreation index = (LuceneIndexCreation) extension;
if (index.getName().equals(indexName)) {
extensionToDelete = extension;
break;
}
}
if (extensionToDelete != null) {
getDataRegion().getExtensionPoint().removeExtension(extensionToDelete);
}
}

protected <K, V> Region<K, V> createRegion(final String regionName,
final RegionAttributes<K, V> attributes) {
// Create InternalRegionArguments to set isUsedForMetaRegion true to suppress xml generation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ public void dumpFiles(String directory) {
return;
}

@Override
public void destroy(boolean initiator) {}
}
Loading

0 comments on commit 11521a8

Please sign in to comment.