Skip to content

Commit

Permalink
OAK-9209: Index lane elastic-async traverses repository
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/jackrabbit/oak/trunk@1882487 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
tihom88 committed Oct 14, 2020
1 parent b668c66 commit 7593d5d
Show file tree
Hide file tree
Showing 5 changed files with 298 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.jackrabbit.oak.spi.commit.SimpleCommitContext;
import org.apache.jackrabbit.oak.spi.commit.ValidatorProvider;
import org.apache.jackrabbit.oak.spi.commit.VisibleEditor;
import org.apache.jackrabbit.oak.spi.state.ChildNodeEntry;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
Expand Down Expand Up @@ -195,6 +196,13 @@ public class AsyncIndexUpdate implements Runnable, Closeable {
private final int cleanupIntervalMinutes
= Integer.getInteger("oak.async.checkpointCleanupIntervalMinutes", 5);

/**
* Setting this to true lead to lane execution (node traversal) even if there
* is no index assigned to this lane under /oak:index.
*/
private final boolean traverseNodesIfLaneNotPresentInIndex
= Boolean.getBoolean("oak.async.traverseNodesIfLaneNotPresentInIndex");

/**
* The time in minutes since the epoch when the last checkpoint cleanup ran.
*/
Expand Down Expand Up @@ -426,6 +434,9 @@ private void checkIfStopped() throws CommitFailedException {

@Override
public synchronized void run() {
if (!shouldProceed()){
return;
}
boolean permitAcquired = false;
try{
if (runPermit.tryAcquire()){
Expand All @@ -441,7 +452,6 @@ public synchronized void run() {
}
}


@Override
public void close() {
if (closed) {
Expand Down Expand Up @@ -613,6 +623,43 @@ private void runWhenPermitted() {
}
}

private boolean shouldProceed() {
NodeState asyncNode = store.getRoot().getChildNode(":async");
/*
If /:async node already have the lane(under consideration) info, we can proceed ahead, as
majorly this change is to stop repository traversal on very first run. If lane had already
traversed nodes in repository there is no point stopping this now.
*/
if (asyncNode.exists() && asyncNode.hasProperty(name)) {
return true;
}
return traverseNodesIfLaneNotPresentInIndex || isIndexWithLanePresent();
}

/**
*
* @return true if there is at least one index present under /oak:index with indexingLane in action.
*/
private boolean isIndexWithLanePresent() {
NodeState oakIndexNode = store.getRoot().getChildNode("oak:index");
if (!oakIndexNode.exists()) {
log.info("lane: {} - no indexes exist under /oak:index", name);
return false;
}
for (ChildNodeEntry childNodeEntry : oakIndexNode.getChildNodeEntries()) {
PropertyState async = childNodeEntry.getNodeState().getProperty("async");
if (async != null) {
for (String s : async.getValue(Type.STRINGS)) {
if (s.equals(name)) {
return true;
}
}
}
}
log.info("lane: {} not present for indexes under /oak:index", name);
return false;
}

private void markFailingIndexesAsCorrupt(NodeBuilder builder) {
for (Map.Entry<String, CorruptIndexInfo> index : corruptIndexHandler.getCorruptIndexData(name).entrySet()){
NodeBuilder indexBuilder = childBuilder(builder, index.getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.jackrabbit.oak.plugins.index;

import java.util.Map;
import java.util.Properties;
import java.util.Set;

import com.google.common.collect.ImmutableSet;
Expand All @@ -29,6 +30,8 @@
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import static org.hamcrest.Matchers.containsInAnyOrder;
Expand All @@ -40,6 +43,18 @@ public class AsyncIndexInfoServiceImplTest {
private PropertyIndexEditorProvider provider = new PropertyIndexEditorProvider();

private AsyncIndexInfoServiceImpl service = new AsyncIndexInfoServiceImpl(store);
private Properties systemProperties;

@Before
public void setup(){
systemProperties =(Properties) System.getProperties().clone();
System.setProperty("oak.async.traverseNodesIfLaneNotPresentInIndex", "true");
}

@After
public void shutDown(){
System.setProperties(systemProperties);
}

@Test
public void names() throws Exception{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -94,6 +96,7 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import ch.qos.logback.classic.Level;
Expand All @@ -107,8 +110,17 @@ public class AsyncIndexUpdateTest {
private MetricStatisticsProvider statsProvider =
new MetricStatisticsProvider(ManagementFactory.getPlatformMBeanServer(),executor);

private Properties systemProperties;

@Before
public void setup(){
systemProperties =(Properties) System.getProperties().clone();
System.setProperty("oak.async.traverseNodesIfLaneNotPresentInIndex", "true");
}

@After
public void shutDown(){
System.setProperties(systemProperties);
statsProvider.close();
new ExecutorCloser(executor).close();
}
Expand Down Expand Up @@ -503,7 +515,6 @@ public void cpCleanupNoChanges() throws Exception {
checkpoints.size() == 1);
assertEquals(store.getRoot().getChildNode(ASYNC)
.getString("async"), checkpoints.iterator().next());

async.run();
assertEquals("Expecting no checkpoint changes",
checkpoints, store.listCheckpoints());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import org.apache.jackrabbit.oak.spi.state.NodeStateUtils;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
Expand Down Expand Up @@ -87,6 +89,19 @@ public class IndexImporterTest {

private NodeStore store = new MemoryNodeStore();
private IndexEditorProvider provider = new PropertyIndexEditorProvider();
private Properties systemProperties;

@Before
public void setup(){
systemProperties =(Properties) System.getProperties().clone();
System.setProperty("oak.async.traverseNodesIfLaneNotPresentInIndex", "true");
}

@After
public void shutDown(){
System.setProperties(systemProperties);
}


@Test(expected = IllegalArgumentException.class)
public void importIndex_NoMeta() throws Exception{
Expand Down
Loading

0 comments on commit 7593d5d

Please sign in to comment.