Skip to content

Commit

Permalink
OAK-8593: Enable a transient cluster-node to connect as invisible to …
Browse files Browse the repository at this point in the history
…oak discovery

Added an 'Invisible' flag in ClusterInfo
DocumentFixtureProvider in oak-run-commons now adds 'invisible' by default

git-svn-id: https://svn.apache.org/repos/asf/jackrabbit/oak/trunk@1867571 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
amit-jain committed Sep 26, 2019
1 parent a4f0393 commit a6f2df0
Show file tree
Hide file tree
Showing 16 changed files with 652 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ static DocumentNodeStore configureDocumentMk(Options options,
if (readOnly) {
builder.setReadOnlyMode();
}
builder.setClusterInvisible(true);

int cacheSize = docStoreOpts.getCacheSize();
if (cacheSize != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
package org.apache.jackrabbit.oak.run.cli;

import java.io.IOException;
import java.util.List;

import joptsimple.OptionParser;
import org.apache.jackrabbit.oak.plugins.document.ClusterNodeInfoDocument;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStoreBuilder;
import org.apache.jackrabbit.oak.plugins.document.MongoUtils;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
Expand All @@ -34,6 +37,7 @@

import static java.util.Collections.emptyMap;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand All @@ -54,6 +58,7 @@ public void documentNodeStore() throws Exception{
builder.setChildNode("foo");
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
assertNotNull(fixture.getBlobStore());
assertClusterInvisible(store);
}
}

Expand All @@ -75,4 +80,11 @@ private Options createMongoOptions() throws IOException {
opts.parseAndConfigure(parser, new String[] {MongoUtils.URL});
return opts;
}

private void assertClusterInvisible(NodeStore store) {
List<ClusterNodeInfoDocument> clusterInfos =
ClusterNodeInfoDocument.all(((DocumentNodeStore) store).getDocumentStore());
assertNotNull(clusterInfos);
assertTrue(clusterInfos.get(0).isInvisible());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ static RecoverLockState fromString(String state) {
*/
private static final String READ_WRITE_MODE_KEY = "readWriteMode";

/**
* Key for invisible flag
*/
public static final String INVISIBLE = "invisible";

/**
* The unique machine id (the MAC address if available).
*/
Expand Down Expand Up @@ -339,8 +344,14 @@ static RecoverLockState fromString(String state) {
*/
private LeaseFailureHandler leaseFailureHandler;

/**
* Flag to indicate this node is invisible to cluster view and thus recovery.
*/
private boolean invisible;


private ClusterNodeInfo(int id, DocumentStore store, String machineId,
String instanceId, boolean newEntry) {
String instanceId, boolean newEntry, boolean invisible) {
this.id = id;
this.startTime = getCurrentTime();
this.leaseEndTime = this.startTime +leaseTime;
Expand All @@ -349,6 +360,7 @@ private ClusterNodeInfo(int id, DocumentStore store, String machineId,
this.machineId = machineId;
this.instanceId = instanceId;
this.newEntry = newEntry;
this.invisible = invisible;
}

void setLeaseCheckMode(@NotNull LeaseCheckMode mode) {
Expand All @@ -371,6 +383,10 @@ String getInstanceId() {
return instanceId;
}

boolean isInvisible() {
return invisible;
}

/**
* Create a cluster node info instance to be utilized for read only access
* to underlying store.
Expand All @@ -379,7 +395,7 @@ String getInstanceId() {
* @return the cluster node info
*/
public static ClusterNodeInfo getReadOnlyInstance(DocumentStore store) {
return new ClusterNodeInfo(0, store, MACHINE_ID, WORKING_DIR, true) {
return new ClusterNodeInfo(0, store, MACHINE_ID, WORKING_DIR, true, true) {
@Override
public void dispose() {
}
Expand All @@ -406,6 +422,26 @@ public void setLeaseFailureHandler(LeaseFailureHandler leaseFailureHandler) {}
};
}

/**
* Get or create a cluster node info instance for the store.
*
* @param store the document store (for the lease)
* @param recoveryHandler the recovery handler to call for a clusterId with
* an expired lease.
* @param machineId the machine id (null for MAC address)
* @param instanceId the instance id (null for current working directory)
* @param configuredClusterId the configured cluster id (or 0 for dynamic assignment)
* @return the cluster node info
*/
public static ClusterNodeInfo getInstance(DocumentStore store,
RecoveryHandler recoveryHandler,
String machineId,
String instanceId,
int configuredClusterId) {

return getInstance(store, recoveryHandler, machineId, instanceId, configuredClusterId, false);
}

/**
* Get or create a cluster node info instance for the store.
*
Expand All @@ -421,7 +457,8 @@ public static ClusterNodeInfo getInstance(DocumentStore store,
RecoveryHandler recoveryHandler,
String machineId,
String instanceId,
int configuredClusterId) {
int configuredClusterId,
boolean invisible) {
// defaults for machineId and instanceID
if (machineId == null) {
machineId = MACHINE_ID;
Expand All @@ -434,7 +471,7 @@ public static ClusterNodeInfo getInstance(DocumentStore store,
for (int i = 0; i < retries; i++) {
Map.Entry<ClusterNodeInfo, Long> suggestedClusterNode =
createInstance(store, recoveryHandler, machineId,
instanceId, configuredClusterId, i == 0);
instanceId, configuredClusterId, i == 0, invisible);
ClusterNodeInfo clusterNode = suggestedClusterNode.getKey();
Long currentStartTime = suggestedClusterNode.getValue();
String key = String.valueOf(clusterNode.id);
Expand All @@ -446,6 +483,7 @@ public static ClusterNodeInfo getInstance(DocumentStore store,
update.set(INFO_KEY, clusterNode.toString());
update.set(STATE, ACTIVE.name());
update.set(OAK_VERSION_KEY, OAK_VERSION);
update.set(INVISIBLE, invisible);

ClusterNodeInfoDocument before = null;
final boolean success;
Expand Down Expand Up @@ -485,7 +523,8 @@ private static Map.Entry<ClusterNodeInfo, Long> createInstance(DocumentStore sto
String machineId,
String instanceId,
int configuredClusterId,
boolean waitForLease) {
boolean waitForLease,
boolean invisible) {

long now = getCurrentTime();
int maxId = 0;
Expand Down Expand Up @@ -544,7 +583,7 @@ private static Map.Entry<ClusterNodeInfo, Long> createInstance(DocumentStore sto
&& iId.equals(instanceId)) {
boolean worthRetrying = waitForLeaseExpiry(store, doc, leaseEnd, machineId, instanceId);
if (worthRetrying) {
return createInstance(store, recoveryHandler, machineId, instanceId, configuredClusterId, false);
return createInstance(store, recoveryHandler, machineId, instanceId, configuredClusterId, false, invisible);
}
}

Expand Down Expand Up @@ -579,7 +618,7 @@ private static Map.Entry<ClusterNodeInfo, Long> createInstance(DocumentStore sto

// create a candidate. those with matching machine and instance id
// are preferred, then the one with the lowest clusterId.
candidates.add(new ClusterNodeInfo(id, store, mId, iId, false));
candidates.add(new ClusterNodeInfo(id, store, mId, iId, false, invisible));
startTimes.put(id, doc.getStartTime());
}

Expand All @@ -596,19 +635,21 @@ private static Map.Entry<ClusterNodeInfo, Long> createInstance(DocumentStore sto
clusterNodeId = maxId + 1;
}
// No usable existing entry found so create a new entry
candidates.add(new ClusterNodeInfo(clusterNodeId, store, machineId, instanceId, true));
candidates.add(new ClusterNodeInfo(clusterNodeId, store, machineId, instanceId, true, invisible));
}

// use the best candidate
ClusterNodeInfo info = candidates.first();
// and replace with an info matching the current machine and instance id
info = new ClusterNodeInfo(info.id, store, machineId, instanceId, info.newEntry);
info = new ClusterNodeInfo(info.id, store, machineId, instanceId, info.newEntry, invisible);
return new AbstractMap.SimpleImmutableEntry<>(info, startTimes.get(info.getId()));
}

private static void logClusterIdAcquired(ClusterNodeInfo clusterNode,
ClusterNodeInfoDocument before) {
String type = clusterNode.newEntry ? "new" : "existing";
type = clusterNode.invisible ? (type + " (invisible)") : type;

String machineInfo = clusterNode.machineId;
String instanceInfo = clusterNode.instanceId;
if (before != null) {
Expand Down Expand Up @@ -654,7 +695,7 @@ private static boolean waitForLeaseExpiry(DocumentStore store, ClusterNodeInfoDo
// check state of cluster node info
ClusterNodeInfoDocument reread = store.find(Collection.CLUSTER_NODES, key);
if (reread == null) {
LOG.info("Cluster node info " + key + ": gone; continueing.");
LOG.info("Cluster node info " + key + ": gone; continuing.");
return true;
} else {
Long newLeaseEnd = (Long) reread.get(LEASE_END_KEY);
Expand Down Expand Up @@ -1098,6 +1139,7 @@ public void dispose() {
UpdateOp update = new UpdateOp("" + id, true);
update.set(LEASE_END_KEY, null);
update.set(STATE, null);
update.set(INVISIBLE, false);
store.createOrUpdate(Collection.CLUSTER_NODES, update);
state = NONE;
}
Expand All @@ -1114,7 +1156,8 @@ public String toString() {
"leaseCheckMode: " + leaseCheckMode.name() + ",\n" +
"state: " + state + ",\n" +
"oakVersion: " + OAK_VERSION + ",\n" +
"formatVersion: " + DocumentNodeStore.VERSION;
"formatVersion: " + DocumentNodeStore.VERSION + ",\n" +
"invisible: " + invisible;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,4 +157,14 @@ private RecoverLockState getRecoveryState(){
public String getLastWrittenRootRev() {
return (String) get(ClusterNodeInfo.LAST_WRITTEN_ROOT_REV_KEY);
}

/**
* Is the cluster node marked as invisible
* @return {@code true} if invisible; {@code false}
* otherwise.
*/
public boolean isInvisible() {
Boolean invisible = (Boolean) get(ClusterNodeInfo.INVISIBLE);
return invisible != null ? invisible : false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public String getConfig() {
public List<Map<String, String>> getClientInfo() {
ArrayList<Map<String, String>> list = new ArrayList<Map<String, String>>();
for (ClusterNodeInfoDocument doc : ClusterNodeInfoDocument.all(documentNodeStore.getDocumentStore())) {
if (!doc.isActive()) {
if (!doc.isActive() || doc.isInvisible()) {
continue;
}
Object broadcastId = doc.get(DynamicBroadcastConfig.ID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,19 +372,21 @@ private boolean checkView() {

for (Iterator<ClusterNodeInfoDocument> it = allClusterNodes.iterator(); it.hasNext();) {
ClusterNodeInfoDocument clusterNode = it.next();
allNodeIds.put(clusterNode.getClusterId(), clusterNode);
if (clusterNode.isBeingRecovered()) {
recoveringNodes.put(clusterNode.getClusterId(), clusterNode);
} else if (!clusterNode.isActive()) {
if (hasBacklog(clusterNode)) {
backlogNodes.put(clusterNode.getClusterId(), clusterNode);
if (!clusterNode.isInvisible()) {
allNodeIds.put(clusterNode.getClusterId(), clusterNode);
if (clusterNode.isBeingRecovered()) {
recoveringNodes.put(clusterNode.getClusterId(), clusterNode);
} else if (!clusterNode.isActive()) {
if (hasBacklog(clusterNode)) {
backlogNodes.put(clusterNode.getClusterId(), clusterNode);
} else {
inactiveNoBacklogNodes.put(clusterNode.getClusterId(), clusterNode);
}
} else if (clusterNode.getLeaseEndTime() < System.currentTimeMillis()) {
activeButTimedOutNodes.put(clusterNode.getClusterId(), clusterNode);
} else {
inactiveNoBacklogNodes.put(clusterNode.getClusterId(), clusterNode);
activeNotTimedOutNodes.put(clusterNode.getClusterId(), clusterNode);
}
} else if (clusterNode.getLeaseEndTime() < System.currentTimeMillis()) {
activeButTimedOutNodes.put(clusterNode.getClusterId(), clusterNode);
} else {
activeNotTimedOutNodes.put(clusterNode.getClusterId(), clusterNode);
}
}

Expand Down Expand Up @@ -471,7 +473,8 @@ private Revision getLastKnownRevision(int clusterNodeId) {
return null;
}

private boolean hasBacklog(ClusterNodeInfoDocument clusterNode) {
/** package access only for testing **/
boolean hasBacklog(ClusterNodeInfoDocument clusterNode) {
if (logger.isTraceEnabled()) {
logger.trace("hasBacklog: start. clusterNodeId: {}", clusterNode.getClusterId());
}
Expand Down Expand Up @@ -653,7 +656,7 @@ private void wakeupBackgroundWorker(WakeupReason wakeupReason) {
* background-read has finished - as it could be waiting for a crashed
* node's recovery to finish - which it can only do by checking the
* lastKnownRevision of the crashed instance - and that check is best done
* after the background read is just finished (it could optinoally do that
* after the background read is just finished (it could optionally do that
* just purely time based as well, but going via a listener is more timely,
* that's why this approach has been chosen).
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ public DocumentNodeStore(DocumentNodeStoreBuilder<?> builder) {
} else {
clusterNodeInfo = ClusterNodeInfo.getInstance(nonLeaseCheckingStore,
new RecoveryHandlerImpl(nonLeaseCheckingStore, clock, lastRevSeeker),
null, null, cid);
null, null, cid, builder.isClusterInvisible());
checkRevisionAge(nonLeaseCheckingStore, clusterNodeInfo, clock);
}
this.clusterId = clusterNodeInfo.getId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.jackrabbit.oak.plugins.document.util.StringValue;
import org.apache.jackrabbit.oak.spi.blob.AbstractBlobStore;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
import org.apache.jackrabbit.oak.spi.gc.LoggingGCMonitor;
Expand Down Expand Up @@ -151,6 +150,7 @@ public class DocumentNodeStoreBuilder<T extends DocumentNodeStoreBuilder<T>> {
private GCMonitor gcMonitor = new LoggingGCMonitor(
LoggerFactory.getLogger(VersionGarbageCollector.class));
private Predicate<Path> nodeCachePredicate = Predicates.alwaysTrue();
private boolean clusterInvisible;

/**
* @return a new {@link DocumentNodeStoreBuilder}.
Expand Down Expand Up @@ -336,6 +336,18 @@ public T setClusterId(int clusterId) {
return thisBuilder();
}

/**
* Set the cluster as invisible to the discovery lite service. By default
* it is visible.
*
* @return this
* @see DocumentDiscoveryLiteService
*/
public T setClusterInvisible(boolean invisible) {
this.clusterInvisible = invisible;
return thisBuilder();
}

public T setCacheSegmentCount(int cacheSegmentCount) {
this.cacheSegmentCount = cacheSegmentCount;
return thisBuilder();
Expand All @@ -350,6 +362,10 @@ public int getClusterId() {
return clusterId;
}

public boolean isClusterInvisible() {
return clusterInvisible;
}

/**
* Set the maximum delay to write the last revision to the root node. By
* default 1000 (meaning 1 second) is used.
Expand Down
Loading

0 comments on commit a6f2df0

Please sign in to comment.