Skip to content

Commit

Permalink
OAK-8489: Reduce memory usage of in-memory commit
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/jackrabbit/oak/trunk@1863640 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
mreutegg committed Jul 23, 2019
1 parent e6d503d commit 1bb47f9
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@

import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.commons.json.JsopStream;
import org.apache.jackrabbit.oak.commons.json.JsopWriter;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
Expand All @@ -44,8 +42,8 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.transform;
import static com.google.common.collect.Lists.partition;
import static java.util.Collections.singletonList;
import static org.apache.jackrabbit.oak.commons.PathUtils.denotesRoot;
import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL;
import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;
import static org.apache.jackrabbit.oak.plugins.document.Document.MOD_COUNT;
Expand Down Expand Up @@ -331,7 +329,9 @@ private void applyToDocumentStore(RevisionVector baseBranchRevision)
}
}

rollback = new Rollback(revision, opLog, Utils.getIdFromPath(commitRootPath));
rollback = new Rollback(revision, opLog,
Utils.getIdFromPath(commitRootPath),
nodeStore.getCreateOrUpdateBatchSize());

for (Path p : bundledNodes.keySet()){
markChanged(p);
Expand Down Expand Up @@ -363,9 +363,12 @@ private void applyToDocumentStore(RevisionVector baseBranchRevision)
if (conditionalCommit(changedNodes, commitValue)) {
success = true;
} else {
List<NodeDocument> oldDocs = store.createOrUpdate(NODES, changedNodes);
checkConflicts(oldDocs, changedNodes);
checkSplitCandidate(oldDocs);
int batchSize = nodeStore.getCreateOrUpdateBatchSize();
for (List<UpdateOp> updates : partition(changedNodes, batchSize)) {
List<NodeDocument> oldDocs = store.createOrUpdate(NODES, updates);
checkConflicts(oldDocs, updates);
checkSplitCandidate(oldDocs);
}

// finally write the commit root (the commit root might be written
// twice, first to check if there was a conflict, and only then to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Iterables.filter;
import static com.google.common.collect.Iterables.partition;
import static com.google.common.collect.Iterables.transform;
import static com.google.common.collect.Lists.reverse;
import static java.util.Collections.singletonList;
Expand Down Expand Up @@ -194,6 +195,13 @@ public final class DocumentNodeStore
*/
private int collisionGarbageBatchSize = Integer.getInteger("oak.documentMK.collisionGarbageBatchSize", 1000);

/**
* The number of updates to batch with a single call to
* {@link DocumentStore#createOrUpdate(Collection, List)}.
*/
private final int createOrUpdateBatchSize =
Integer.getInteger("oak.documentMK.createOrUpdateBatchSize", 1000);

/**
* The document store without potentially lease checking wrapper.
*/
Expand Down Expand Up @@ -1714,7 +1722,9 @@ RevisionVector reset(@NotNull RevisionVector branchHead,
new ResetDiff(previous.asTrunkRevision(), operations));
LOG.debug("reset: applying {} operations", operations.size());
// apply reset operations
store.createOrUpdate(NODES, new ArrayList<>(operations.values()));
for (List<UpdateOp> ops : partition(operations.values(), getCreateOrUpdateBatchSize())) {
store.createOrUpdate(NODES, ops);
}
}
store.findAndUpdate(NODES, rootOp);
// clean up in-memory branch data
Expand Down Expand Up @@ -2441,6 +2451,10 @@ RevisionVector getSweepRevisions() {
return sweepRevisions;
}

int getCreateOrUpdateBatchSize() {
return createOrUpdateBatchSize;
}

//-----------------------------< internal >---------------------------------

private BackgroundWriteStats backgroundWrite() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.jetbrains.annotations.NotNull;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Lists.partition;
import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES;

/**
Expand All @@ -32,7 +33,7 @@
class Rollback {

static final Rollback FAILED = new Rollback(Revision.newRevision(0),
Collections.emptyList(), "") {
Collections.emptyList(), "", 1) {

@Override
void perform(@NotNull DocumentStore store) throws DocumentStoreException {
Expand All @@ -41,7 +42,7 @@ void perform(@NotNull DocumentStore store) throws DocumentStoreException {
};

static final Rollback NONE = new Rollback(Revision.newRevision(0),
Collections.emptyList(), "") {
Collections.emptyList(), "", 1) {

@Override
void perform(@NotNull DocumentStore store) throws DocumentStoreException {
Expand All @@ -54,19 +55,24 @@ void perform(@NotNull DocumentStore store) throws DocumentStoreException {

private final String commitRootId;

private final int batchSize;

/**
* Creates a new rollback for the given commit revision.
*
* @param revision the commit revision.
* @param changed the changes to revert.
* @param commitRootId the id of the commit root document.
* @param batchSize the batch size for the rollback operations.
*/
Rollback(@NotNull Revision revision,
@NotNull List<UpdateOp> changed,
@NotNull String commitRootId) {
@NotNull String commitRootId,
int batchSize) {
this.revision = revision;
this.changed = checkNotNull(changed);
this.commitRootId = checkNotNull(commitRootId);
this.batchSize = batchSize;
}

/**
Expand All @@ -89,7 +95,9 @@ void perform(@NotNull DocumentStore store) throws DocumentStoreException {
reverse.setNew(false);
reverseOps.add(reverse);
}
store.createOrUpdate(NODES, reverseOps);
for (List<UpdateOp> ops : partition(reverseOps, batchSize)) {
store.createOrUpdate(NODES, ops);
}
removeCollisionMarker(store, commitRootId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
Expand Down Expand Up @@ -3976,6 +3977,72 @@ public void readOnlyOnEmptyDocumentStore() {
}
}

@Test
public void partitionedUpdates() throws Exception {
AtomicInteger maxBatchSize = new AtomicInteger(0);
DocumentStore store = new DocumentStoreWrapper(new MemoryDocumentStore()) {
@Override
public <T extends Document> List<T> createOrUpdate(Collection<T> collection,
List<UpdateOp> updateOps) {
maxBatchSize.set(Math.max(maxBatchSize.get(), updateOps.size()));
return super.createOrUpdate(collection, updateOps);
}
};
// set batch size to half the update limit
int batchSize = DocumentNodeStoreBuilder.UPDATE_LIMIT / 2;
System.setProperty("oak.documentMK.createOrUpdateBatchSize",
String.valueOf(batchSize));
try {
DocumentNodeStore ns = builderProvider.newBuilder()
.setAsyncDelay(0).setDocumentStore(store).build();
NodeBuilder builder = ns.getRoot().builder();
for (int i = 0; i < DocumentNodeStoreBuilder.UPDATE_LIMIT; i++) {
builder.child("c-" + i);
}
merge(ns, builder);
assertThat(maxBatchSize.get(), greaterThan(0));
assertThat(maxBatchSize.get(), lessThanOrEqualTo(batchSize));
} finally {
System.clearProperty("oak.documentMK.createOrUpdateBatchSize");
}
}

@Test
public void partitionedReset() {
AtomicInteger maxBatchSize = new AtomicInteger(0);
DocumentStore store = new DocumentStoreWrapper(new MemoryDocumentStore()) {
@Override
public <T extends Document> List<T> createOrUpdate(Collection<T> collection,
List<UpdateOp> updateOps) {
maxBatchSize.set(Math.max(maxBatchSize.get(), updateOps.size()));
return super.createOrUpdate(collection, updateOps);
}
};
// set batch size to half the update limit
int batchSize = DocumentNodeStoreBuilder.UPDATE_LIMIT / 2;
System.setProperty("oak.documentMK.createOrUpdateBatchSize",
String.valueOf(batchSize));
try {
DocumentNodeStore ns = builderProvider.newBuilder()
.setAsyncDelay(0).setDocumentStore(store).build();
DocumentNodeStoreBranch branch = ns.createBranch(ns.getRoot());
NodeBuilder builder = branch.getBase().builder();
for (int i = 0; i < DocumentNodeStoreBuilder.UPDATE_LIMIT * 2; i++) {
builder.child("c-" + i).setProperty("p", "a");
}
branch.setRoot(builder.getNodeState());
branch.persist();

maxBatchSize.set(0);
ns.reset(asDocumentNodeState(branch.getHead()).getRootRevision(),
asDocumentNodeState(branch.getBase()).getRootRevision().asBranchRevision(ns.getClusterId()));
assertThat(maxBatchSize.get(), greaterThan(0));
assertThat(maxBatchSize.get(), lessThanOrEqualTo(batchSize));
} finally {
System.clearProperty("oak.documentMK.createOrUpdateBatchSize");
}
}

private void getChildNodeCountTest(int numChildren,
Iterable<Long> maxValues,
Iterable<Long> expectedValues)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.jackrabbit.oak.plugins.document;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
Expand All @@ -26,6 +27,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key;
Expand All @@ -41,7 +43,9 @@

import static org.apache.jackrabbit.oak.plugins.document.TestUtils.isFinalCommitRootUpdate;
import static org.apache.jackrabbit.oak.plugins.document.TestUtils.merge;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
Expand Down Expand Up @@ -129,6 +133,28 @@ public void rollbackNone() {
Rollback.NONE.perform(new MemoryDocumentStore());
}

@Test
public void batchSize() {
AtomicInteger maxBatchSize = new AtomicInteger(0);
DocumentStore store = new DocumentStoreWrapper(new MemoryDocumentStore()) {
@Override
public <T extends Document> List<T> createOrUpdate(Collection<T> collection,
List<UpdateOp> updateOps) {
maxBatchSize.set(Math.max(maxBatchSize.get(), updateOps.size()));
return super.createOrUpdate(collection, updateOps);
}
};
int batchSize = 100;
List<UpdateOp> updates = new ArrayList<>();
for (int i = 0; i < batchSize * 2; i++) {
updates.add(new UpdateOp("id-" + i, false));
}
new Rollback(Revision.newRevision(1),
updates, "id", batchSize).perform(store);
assertThat(maxBatchSize.get(), greaterThan(0));
assertThat(maxBatchSize.get(), lessThanOrEqualTo(batchSize));
}

private class TestStore extends MemoryDocumentStore {

final AtomicBoolean failCommitOnce = new AtomicBoolean();
Expand Down

0 comments on commit 1bb47f9

Please sign in to comment.