From 1bb47f9349948f62136e61698859d57c7f0fa283 Mon Sep 17 00:00:00 2001 From: Marcel Reutegger Date: Tue, 23 Jul 2019 13:03:27 +0000 Subject: [PATCH] OAK-8489: Reduce memory usage of in-memory commit git-svn-id: https://svn.apache.org/repos/asf/jackrabbit/oak/trunk@1863640 13f79535-47bb-0310-9956-ffa450edef68 --- .../oak/plugins/document/Commit.java | 17 +++-- .../plugins/document/DocumentNodeStore.java | 16 ++++- .../oak/plugins/document/Rollback.java | 16 +++-- .../document/DocumentNodeStoreTest.java | 67 +++++++++++++++++++ .../oak/plugins/document/RollbackTest.java | 26 +++++++ 5 files changed, 130 insertions(+), 12 deletions(-) diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java index 9a5a49232fe..b080990e14a 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java @@ -29,9 +29,7 @@ import com.google.common.base.Function; import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import org.apache.jackrabbit.oak.commons.PathUtils; import org.apache.jackrabbit.oak.commons.json.JsopStream; import org.apache.jackrabbit.oak.commons.json.JsopWriter; import org.apache.jackrabbit.oak.plugins.document.util.Utils; @@ -44,8 +42,8 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.collect.Iterables.filter; import static com.google.common.collect.Iterables.transform; +import static com.google.common.collect.Lists.partition; import static java.util.Collections.singletonList; -import static org.apache.jackrabbit.oak.commons.PathUtils.denotesRoot; import static org.apache.jackrabbit.oak.plugins.document.Collection.JOURNAL; import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES; import static org.apache.jackrabbit.oak.plugins.document.Document.MOD_COUNT; @@ -331,7 +329,9 @@ private void applyToDocumentStore(RevisionVector baseBranchRevision) } } - rollback = new Rollback(revision, opLog, Utils.getIdFromPath(commitRootPath)); + rollback = new Rollback(revision, opLog, + Utils.getIdFromPath(commitRootPath), + nodeStore.getCreateOrUpdateBatchSize()); for (Path p : bundledNodes.keySet()){ markChanged(p); @@ -363,9 +363,12 @@ private void applyToDocumentStore(RevisionVector baseBranchRevision) if (conditionalCommit(changedNodes, commitValue)) { success = true; } else { - List oldDocs = store.createOrUpdate(NODES, changedNodes); - checkConflicts(oldDocs, changedNodes); - checkSplitCandidate(oldDocs); + int batchSize = nodeStore.getCreateOrUpdateBatchSize(); + for (List updates : partition(changedNodes, batchSize)) { + List oldDocs = store.createOrUpdate(NODES, updates); + checkConflicts(oldDocs, updates); + checkSplitCandidate(oldDocs); + } // finally write the commit root (the commit root might be written // twice, first to check if there was a conflict, and only then to diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java index 28f87b2ef8c..6c01c13ff70 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.Iterables.filter; +import static com.google.common.collect.Iterables.partition; import static com.google.common.collect.Iterables.transform; import static com.google.common.collect.Lists.reverse; import static java.util.Collections.singletonList; @@ -194,6 +195,13 @@ public final class DocumentNodeStore */ private int collisionGarbageBatchSize = Integer.getInteger("oak.documentMK.collisionGarbageBatchSize", 1000); + /** + * The number of updates to batch with a single call to + * {@link DocumentStore#createOrUpdate(Collection, List)}. + */ + private final int createOrUpdateBatchSize = + Integer.getInteger("oak.documentMK.createOrUpdateBatchSize", 1000); + /** * The document store without potentially lease checking wrapper. */ @@ -1714,7 +1722,9 @@ RevisionVector reset(@NotNull RevisionVector branchHead, new ResetDiff(previous.asTrunkRevision(), operations)); LOG.debug("reset: applying {} operations", operations.size()); // apply reset operations - store.createOrUpdate(NODES, new ArrayList<>(operations.values())); + for (List ops : partition(operations.values(), getCreateOrUpdateBatchSize())) { + store.createOrUpdate(NODES, ops); + } } store.findAndUpdate(NODES, rootOp); // clean up in-memory branch data @@ -2441,6 +2451,10 @@ RevisionVector getSweepRevisions() { return sweepRevisions; } + int getCreateOrUpdateBatchSize() { + return createOrUpdateBatchSize; + } + //-----------------------------< internal >--------------------------------- private BackgroundWriteStats backgroundWrite() { diff --git a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Rollback.java b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Rollback.java index e932351e113..aec32a54ddb 100644 --- a/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Rollback.java +++ b/oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Rollback.java @@ -23,6 +23,7 @@ import org.jetbrains.annotations.NotNull; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.collect.Lists.partition; import static org.apache.jackrabbit.oak.plugins.document.Collection.NODES; /** @@ -32,7 +33,7 @@ class Rollback { static final Rollback FAILED = new Rollback(Revision.newRevision(0), - Collections.emptyList(), "") { + Collections.emptyList(), "", 1) { @Override void perform(@NotNull DocumentStore store) throws DocumentStoreException { @@ -41,7 +42,7 @@ void perform(@NotNull DocumentStore store) throws DocumentStoreException { }; static final Rollback NONE = new Rollback(Revision.newRevision(0), - Collections.emptyList(), "") { + Collections.emptyList(), "", 1) { @Override void perform(@NotNull DocumentStore store) throws DocumentStoreException { @@ -54,19 +55,24 @@ void perform(@NotNull DocumentStore store) throws DocumentStoreException { private final String commitRootId; + private final int batchSize; + /** * Creates a new rollback for the given commit revision. * * @param revision the commit revision. * @param changed the changes to revert. * @param commitRootId the id of the commit root document. + * @param batchSize the batch size for the rollback operations. */ Rollback(@NotNull Revision revision, @NotNull List changed, - @NotNull String commitRootId) { + @NotNull String commitRootId, + int batchSize) { this.revision = revision; this.changed = checkNotNull(changed); this.commitRootId = checkNotNull(commitRootId); + this.batchSize = batchSize; } /** @@ -89,7 +95,9 @@ void perform(@NotNull DocumentStore store) throws DocumentStoreException { reverse.setNew(false); reverseOps.add(reverse); } - store.createOrUpdate(NODES, reverseOps); + for (List ops : partition(reverseOps, batchSize)) { + store.createOrUpdate(NODES, ops); + } removeCollisionMarker(store, commitRootId); } diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java index 033c12811fa..49fa02a61ca 100644 --- a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java +++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStoreTest.java @@ -37,6 +37,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -3976,6 +3977,72 @@ public void readOnlyOnEmptyDocumentStore() { } } + @Test + public void partitionedUpdates() throws Exception { + AtomicInteger maxBatchSize = new AtomicInteger(0); + DocumentStore store = new DocumentStoreWrapper(new MemoryDocumentStore()) { + @Override + public List createOrUpdate(Collection collection, + List updateOps) { + maxBatchSize.set(Math.max(maxBatchSize.get(), updateOps.size())); + return super.createOrUpdate(collection, updateOps); + } + }; + // set batch size to half the update limit + int batchSize = DocumentNodeStoreBuilder.UPDATE_LIMIT / 2; + System.setProperty("oak.documentMK.createOrUpdateBatchSize", + String.valueOf(batchSize)); + try { + DocumentNodeStore ns = builderProvider.newBuilder() + .setAsyncDelay(0).setDocumentStore(store).build(); + NodeBuilder builder = ns.getRoot().builder(); + for (int i = 0; i < DocumentNodeStoreBuilder.UPDATE_LIMIT; i++) { + builder.child("c-" + i); + } + merge(ns, builder); + assertThat(maxBatchSize.get(), greaterThan(0)); + assertThat(maxBatchSize.get(), lessThanOrEqualTo(batchSize)); + } finally { + System.clearProperty("oak.documentMK.createOrUpdateBatchSize"); + } + } + + @Test + public void partitionedReset() { + AtomicInteger maxBatchSize = new AtomicInteger(0); + DocumentStore store = new DocumentStoreWrapper(new MemoryDocumentStore()) { + @Override + public List createOrUpdate(Collection collection, + List updateOps) { + maxBatchSize.set(Math.max(maxBatchSize.get(), updateOps.size())); + return super.createOrUpdate(collection, updateOps); + } + }; + // set batch size to half the update limit + int batchSize = DocumentNodeStoreBuilder.UPDATE_LIMIT / 2; + System.setProperty("oak.documentMK.createOrUpdateBatchSize", + String.valueOf(batchSize)); + try { + DocumentNodeStore ns = builderProvider.newBuilder() + .setAsyncDelay(0).setDocumentStore(store).build(); + DocumentNodeStoreBranch branch = ns.createBranch(ns.getRoot()); + NodeBuilder builder = branch.getBase().builder(); + for (int i = 0; i < DocumentNodeStoreBuilder.UPDATE_LIMIT * 2; i++) { + builder.child("c-" + i).setProperty("p", "a"); + } + branch.setRoot(builder.getNodeState()); + branch.persist(); + + maxBatchSize.set(0); + ns.reset(asDocumentNodeState(branch.getHead()).getRootRevision(), + asDocumentNodeState(branch.getBase()).getRootRevision().asBranchRevision(ns.getClusterId())); + assertThat(maxBatchSize.get(), greaterThan(0)); + assertThat(maxBatchSize.get(), lessThanOrEqualTo(batchSize)); + } finally { + System.clearProperty("oak.documentMK.createOrUpdateBatchSize"); + } + } + private void getChildNodeCountTest(int numChildren, Iterable maxValues, Iterable expectedValues) diff --git a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/RollbackTest.java b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/RollbackTest.java index 505d7de6bb6..d2b63bb72c5 100644 --- a/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/RollbackTest.java +++ b/oak-store-document/src/test/java/org/apache/jackrabbit/oak/plugins/document/RollbackTest.java @@ -18,6 +18,7 @@ */ package org.apache.jackrabbit.oak.plugins.document; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -26,6 +27,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.jackrabbit.oak.api.CommitFailedException; import org.apache.jackrabbit.oak.plugins.document.UpdateOp.Key; @@ -41,7 +43,9 @@ import static org.apache.jackrabbit.oak.plugins.document.TestUtils.isFinalCommitRootUpdate; import static org.apache.jackrabbit.oak.plugins.document.TestUtils.merge; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; @@ -129,6 +133,28 @@ public void rollbackNone() { Rollback.NONE.perform(new MemoryDocumentStore()); } + @Test + public void batchSize() { + AtomicInteger maxBatchSize = new AtomicInteger(0); + DocumentStore store = new DocumentStoreWrapper(new MemoryDocumentStore()) { + @Override + public List createOrUpdate(Collection collection, + List updateOps) { + maxBatchSize.set(Math.max(maxBatchSize.get(), updateOps.size())); + return super.createOrUpdate(collection, updateOps); + } + }; + int batchSize = 100; + List updates = new ArrayList<>(); + for (int i = 0; i < batchSize * 2; i++) { + updates.add(new UpdateOp("id-" + i, false)); + } + new Rollback(Revision.newRevision(1), + updates, "id", batchSize).perform(store); + assertThat(maxBatchSize.get(), greaterThan(0)); + assertThat(maxBatchSize.get(), lessThanOrEqualTo(batchSize)); + } + private class TestStore extends MemoryDocumentStore { final AtomicBoolean failCommitOnce = new AtomicBoolean();