From 7c09f5ce649edcca0be792198d97573197c5a272 Mon Sep 17 00:00:00 2001 From: Renkai Ge Date: Fri, 8 Jan 2021 13:01:17 +0800 Subject: [PATCH] Configurable data source for offloaded messages (#8717) Fix issue: https://github.com/apache/pulsar/issues/8591 This PR include: * API change in command tools * Related implementation with tests * Related docs in cookbook By the way log4j dependency is removed for module `managed-ledger` because now the whole project use log4j2 as the default logger framework. --- .../mledger/impl/ManagedLedgerImpl.java | 14 +- .../src/main/proto/MLDataFormats.proto | 4 +- .../mledger/impl/OffloadPrefixReadTest.java | 125 +++++++++++++++--- .../mledger/impl/OffloadPrefixTest.java | 10 +- .../pulsar/broker/ServiceConfiguration.java | 26 ++-- .../broker/admin/AdminApiOffloadTest.java | 8 +- .../pulsar/broker/admin/NamespacesTest.java | 13 +- .../naming/ServiceConfigurationTest.java | 3 +- .../configurations/pulsar_broker_test.conf | 5 +- pulsar-broker/src/test/resources/logback.xml | 2 +- .../pulsar/admin/cli/PulsarAdminToolTest.java | 10 +- .../pulsar/admin/cli/CmdNamespaces.java | 40 ++++-- .../apache/pulsar/admin/cli/CmdTopics.java | 27 +++- .../common/policies/data/OffloadPolicies.java | 92 ++++++++++--- .../policies/data/OffloadPoliciesTest.java | 9 +- site2/docs/cookbooks-tiered-storage.md | 10 ++ 16 files changed, 312 insertions(+), 86 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index e295566792e73..7713545d56349 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -123,6 +123,7 @@ import org.apache.bookkeeper.mledger.util.Futures; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; +import org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; import org.apache.pulsar.metadata.api.Stat; import org.slf4j.Logger; @@ -1692,7 +1693,18 @@ CompletableFuture getLedgerHandle(long ledgerId) { LedgerInfo info = ledgers.get(ledgerId); CompletableFuture openFuture = new CompletableFuture<>(); - if (info != null && info.hasOffloadContext() && info.getOffloadContext().getComplete()) { + + if (config.getLedgerOffloader() != null + && config.getLedgerOffloader().getOffloadPolicies() != null + && config.getLedgerOffloader().getOffloadPolicies() + .getManagedLedgerOffloadedReadPriority() == OffloadedReadPriority.BOOKKEEPER_FIRST + && info != null && info.hasOffloadContext() + && !info.getOffloadContext().getBookkeeperDeleted()) { + openFuture = bookKeeper.newOpenLedgerOp().withRecovery(!isReadOnly()).withLedgerId(ledgerId) + .withDigestType(config.getDigestType()).withPassword(config.getPassword()).execute(); + + } else if (info != null && info.hasOffloadContext() && info.getOffloadContext().getComplete()) { + UUID uid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb()); // TODO: improve this to load ledger offloader by driver name recorded in metadata Map offloadDriverMetadata = OffloadUtils.getOffloadDriverMetadata(info); diff --git a/managed-ledger/src/main/proto/MLDataFormats.proto b/managed-ledger/src/main/proto/MLDataFormats.proto index 8b1ecbf852087..b7dc5803cdfe9 100644 --- a/managed-ledger/src/main/proto/MLDataFormats.proto +++ b/managed-ledger/src/main/proto/MLDataFormats.proto @@ -48,8 +48,8 @@ message ManagedLedgerInfo { optional int64 timestamp = 4; optional OffloadContext offloadContext = 5; } - - repeated LedgerInfo ledgerInfo = 1; + + repeated LedgerInfo ledgerInfo = 1; // If present, it signals the managed ledger has been // terminated and this was the position of the last diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java index d3d24b2a05765..69011cac556b7 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixReadTest.java @@ -18,19 +18,20 @@ */ package org.apache.bookkeeper.mledger.impl; +import static org.apache.bookkeeper.mledger.impl.OffloadPrefixTest.assertEventuallyTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; - +import static org.testng.Assert.assertEquals; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; - import io.netty.buffer.ByteBuf; - import java.util.Arrays; import java.util.List; import java.util.Map; @@ -39,7 +40,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; - import org.apache.bookkeeper.client.api.DigestType; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; import org.apache.bookkeeper.client.api.LedgerEntries; @@ -52,10 +52,12 @@ import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; +import org.apache.bookkeeper.mledger.util.MockClock; import org.apache.bookkeeper.net.BookieId; -import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.pulsar.common.policies.data.OffloadPolicies; +import org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority; import org.testng.Assert; import org.testng.annotations.Test; @@ -69,53 +71,140 @@ public void testOffloadRead() throws Exception { config.setRetentionTime(10, TimeUnit.MINUTES); config.setRetentionSizeInMB(10); config.setLedgerOffloader(offloader); - ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config); + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_test_ledger", config); for (int i = 0; i < 25; i++) { String content = "entry-" + i; ledger.addEntry(content.getBytes()); } - Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 3); + assertEquals(ledger.getLedgersInfoAsList().size(), 3); ledger.offloadPrefix(ledger.getLastConfirmedEntry()); - Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 3); - Assert.assertEquals(ledger.getLedgersInfoAsList().stream() - .filter(e -> e.getOffloadContext().getComplete()).count(), 2); + assertEquals(ledger.getLedgersInfoAsList().size(), 3); Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getComplete()); Assert.assertTrue(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getComplete()); + Assert.assertFalse(ledger.getLedgersInfoAsList().get(2).getOffloadContext().getComplete()); UUID firstLedgerUUID = new UUID(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidMsb(), - ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidLsb()); + ledger.getLedgersInfoAsList().get(0).getOffloadContext().getUidLsb()); UUID secondLedgerUUID = new UUID(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidMsb(), - ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidLsb()); + ledger.getLedgersInfoAsList().get(1).getOffloadContext().getUidLsb()); ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.earliest); int i = 0; for (Entry e : cursor.readEntries(10)) { - Assert.assertEquals(new String(e.getData()), "entry-" + i++); + assertEquals(new String(e.getData()), "entry-" + i++); } verify(offloader, times(1)) .readOffloaded(anyLong(), any(), anyMap()); verify(offloader).readOffloaded(anyLong(), eq(firstLedgerUUID), anyMap()); for (Entry e : cursor.readEntries(10)) { - Assert.assertEquals(new String(e.getData()), "entry-" + i++); + assertEquals(new String(e.getData()), "entry-" + i++); } verify(offloader, times(2)) - .readOffloaded(anyLong(), any(), anyMap()); + .readOffloaded(anyLong(), any(), anyMap()); verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID), anyMap()); for (Entry e : cursor.readEntries(5)) { - Assert.assertEquals(new String(e.getData()), "entry-" + i++); + assertEquals(new String(e.getData()), "entry-" + i++); } verify(offloader, times(2)) - .readOffloaded(anyLong(), any(), anyMap()); + .readOffloaded(anyLong(), any(), anyMap()); + } + + @Test + public void testBookkeeperFirstOffloadRead() throws Exception { + MockLedgerOffloader offloader = spy(new MockLedgerOffloader()); + MockClock clock = new MockClock(); + offloader.getOffloadPolicies() + .setManagedLedgerOffloadedReadPriority(OffloadedReadPriority.BOOKKEEPER_FIRST); + //delete after 5 minutes + offloader.getOffloadPolicies() + .setManagedLedgerOffloadDeletionLagInMillis(300000L); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(10); + config.setMinimumRolloverTime(0, TimeUnit.SECONDS); + config.setRetentionTime(10, TimeUnit.MINUTES); + config.setRetentionSizeInMB(10); + config.setLedgerOffloader(offloader); + config.setClock(clock); + + + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("my_bookkeeper_first_test_ledger", config); + + for (int i = 0; i < 25; i++) { + String content = "entry-" + i; + ledger.addEntry(content.getBytes()); + } + assertEquals(ledger.getLedgersInfoAsList().size(), 3); + + ledger.offloadPrefix(ledger.getLastConfirmedEntry()); + + assertEquals(ledger.getLedgersInfoAsList().size(), 3); + assertEquals(ledger.getLedgersInfoAsList().stream() + .filter(e -> e.getOffloadContext().getComplete()).count(), 2); + + LedgerInfo firstLedger = ledger.getLedgersInfoAsList().get(0); + Assert.assertTrue(firstLedger.getOffloadContext().getComplete()); + LedgerInfo secondLedger; + secondLedger = ledger.getLedgersInfoAsList().get(1); + Assert.assertTrue(secondLedger.getOffloadContext().getComplete()); + + UUID firstLedgerUUID = new UUID(firstLedger.getOffloadContext().getUidMsb(), + firstLedger.getOffloadContext().getUidLsb()); + UUID secondLedgerUUID = new UUID(secondLedger.getOffloadContext().getUidMsb(), + secondLedger.getOffloadContext().getUidLsb()); + + ManagedCursor cursor = ledger.newNonDurableCursor(PositionImpl.earliest); + int i = 0; + for (Entry e : cursor.readEntries(10)) { + Assert.assertEquals(new String(e.getData()), "entry-" + i++); + } + // For offloaded first and not deleted ledgers, they should be read from bookkeeper. + verify(offloader, never()) + .readOffloaded(anyLong(), any(), anyMap()); + + // Delete offladed message from bookkeeper + assertEventuallyTrue(() -> bkc.getLedgers().contains(firstLedger.getLedgerId())); + assertEventuallyTrue(() -> bkc.getLedgers().contains(secondLedger.getLedgerId())); + clock.advance(6, TimeUnit.MINUTES); + CompletableFuture promise = new CompletableFuture<>(); + ledger.internalTrimConsumedLedgers(promise); + promise.join(); + + // assert bk ledger is deleted + assertEventuallyTrue(() -> !bkc.getLedgers().contains(firstLedger.getLedgerId())); + assertEventuallyTrue(() -> !bkc.getLedgers().contains(secondLedger.getLedgerId())); + Assert.assertTrue(ledger.getLedgersInfoAsList().get(0).getOffloadContext().getBookkeeperDeleted()); + Assert.assertTrue(ledger.getLedgersInfoAsList().get(1).getOffloadContext().getBookkeeperDeleted()); + + for (Entry e : cursor.readEntries(10)) { + Assert.assertEquals(new String(e.getData()), "entry-" + i++); + } + + // Ledgers deleted from bookkeeper, now should read from offloader + verify(offloader, atLeastOnce()) + .readOffloaded(anyLong(), any(), anyMap()); + verify(offloader).readOffloaded(anyLong(), eq(secondLedgerUUID), anyMap()); + } + static class MockLedgerOffloader implements LedgerOffloader { ConcurrentHashMap offloads = new ConcurrentHashMap(); + + OffloadPolicies offloadPolicies = OffloadPolicies.create("S3", "", "", "", + null, null, + OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES, + OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES, + OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES, + OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS, + OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY); + + @Override public String getOffloadDriverName() { return "mock"; @@ -150,7 +239,7 @@ public CompletableFuture deleteOffloaded(long ledgerId, UUID uuid, @Override public OffloadPolicies getOffloadPolicies() { - return null; + return offloadPolicies; } @Override diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java index 807876ecfe444..85543c5421b9b 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/OffloadPrefixTest.java @@ -23,22 +23,19 @@ import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; - import com.google.common.collect.ImmutableSet; - import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.CompletionException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.BooleanSupplier; import java.util.stream.Collectors; - import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback; @@ -50,11 +47,9 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.commons.lang3.tuple.Pair; - import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.testng.annotations.Test; public class OffloadPrefixTest extends MockedBookKeeperTestCase { @@ -995,7 +990,8 @@ Set deletedOffloads() { OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES, OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES, OffloadPolicies.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES, - OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS); + OffloadPolicies.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS, + OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY); @Override public String getOffloadDriverName() { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 14828357d172a..ea457abf9bf92 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -21,9 +21,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; - import io.netty.util.internal.PlatformDependent; - import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -35,14 +33,15 @@ import lombok.Setter; import org.apache.bookkeeper.client.api.DigestType; import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider; -import org.apache.pulsar.common.nar.NarClassLoader; -import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; -import org.apache.pulsar.common.policies.data.TopicType; -import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.configuration.Category; import org.apache.pulsar.common.configuration.FieldContext; import org.apache.pulsar.common.configuration.PulsarConfiguration; +import org.apache.pulsar.common.nar.NarClassLoader; import org.apache.pulsar.common.policies.data.BacklogQuota; +import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; +import org.apache.pulsar.common.policies.data.OffloadPolicies; +import org.apache.pulsar.common.policies.data.TopicType; +import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.sasl.SaslConstants; /** @@ -1459,17 +1458,22 @@ public class ServiceConfiguration implements PulsarConfiguration { + "Of course, this may degrade consumption throughput. Default is 10ms.") private int managedLedgerNewEntriesCheckDelayInMillis = 10; + @FieldContext(category = CATEGORY_STORAGE_ML, + doc = "Read priority when ledgers exists in both bookkeeper and the second layer storage.") + private String managedLedgerDataReadPriority = OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST + .getValue(); + /*** --- Load balancer --- ****/ @FieldContext( - category = CATEGORY_LOAD_BALANCER, - doc = "Enable load balancer" + category = CATEGORY_LOAD_BALANCER, + doc = "Enable load balancer" ) private boolean loadBalancerEnabled = true; @Deprecated @FieldContext( - category = CATEGORY_LOAD_BALANCER, - deprecated = true, - doc = "load placement strategy[weightedRandomSelection/leastLoadedServer] (only used by SimpleLoadManagerImpl)" + category = CATEGORY_LOAD_BALANCER, + deprecated = true, + doc = "load placement strategy[weightedRandomSelection/leastLoadedServer] (only used by SimpleLoadManagerImpl)" ) private String loadBalancerPlacementStrategy = "leastLoadedServer"; // weighted random selection diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java index 7aa027957f7b1..6644d8c957ce5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java @@ -27,14 +27,11 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; - import com.google.common.collect.Sets; - import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; - import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedLedgerInfo; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; @@ -165,10 +162,11 @@ public void testOffloadPolicies() throws Exception { String endpoint = "test-endpoint"; long offloadThresholdInBytes = 0; long offloadDeletionLagInMillis = 100L; + OffloadPolicies.OffloadedReadPriority priority = OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST; OffloadPolicies offload1 = OffloadPolicies.create( driver, region, bucket, endpoint, null, null, - 100, 100, offloadThresholdInBytes, offloadDeletionLagInMillis); + 100, 100, offloadThresholdInBytes, offloadDeletionLagInMillis, priority); admin.namespaces().setOffloadPolicies(namespaceName, offload1); OffloadPolicies offload2 = admin.namespaces().getOffloadPolicies(namespaceName); assertEquals(offload1, offload2); @@ -214,7 +212,7 @@ public void testTopicLevelOffloadPartitioned() throws Exception { Thread.sleep(2000); testOffload(true); } - + @Test public void testTopicLevelOffloadNonPartitioned() throws Exception { //wait for cache init diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index d6f9a5c8dca03..041539e725695 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -35,7 +35,6 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; - import com.google.common.collect.Lists; import com.google.common.collect.Sets; import java.lang.reflect.Field; @@ -1274,7 +1273,8 @@ public void testSetOffloadThreshold() throws Exception { OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES, OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES, admin.namespaces().getOffloadThreshold(namespace), - pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs())); + pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(), + OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY)); ledgerConf.setLedgerOffloader(offloader); assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(), new Long(-1)); @@ -1289,7 +1289,8 @@ public void testSetOffloadThreshold() throws Exception { OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES, OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES, admin.namespaces().getOffloadThreshold(namespace), - pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs())); + pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(), + OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY)); ledgerConf.setLedgerOffloader(offloader); assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(), new Long(100)); @@ -1303,7 +1304,8 @@ public void testSetOffloadThreshold() throws Exception { OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES, OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES, admin.namespaces().getOffloadThreshold(namespace), - pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs())); + pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(), + OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY)); ledgerConf.setLedgerOffloader(offloader); assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(), new Long(-2)); @@ -1317,7 +1319,8 @@ public void testSetOffloadThreshold() throws Exception { OffloadPolicies.DEFAULT_MAX_BLOCK_SIZE_IN_BYTES, OffloadPolicies.DEFAULT_READ_BUFFER_SIZE_IN_BYTES, admin.namespaces().getOffloadThreshold(namespace), - pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs())); + pulsar.getConfiguration().getManagedLedgerOffloadDeletionLagMs(), + OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY)); ledgerConf.setLedgerOffloader(offloader); assertEquals(ledgerConf.getLedgerOffloader().getOffloadPolicies().getManagedLedgerOffloadThresholdInBytes(), new Long(-1)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java index 258c1234411c1..bc9e4bd01ff3d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/common/naming/ServiceConfigurationTest.java @@ -22,7 +22,6 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; - import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -31,7 +30,6 @@ import java.nio.charset.StandardCharsets; import java.util.Optional; import java.util.Properties; - import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; @@ -64,6 +62,7 @@ public void testInit() throws Exception { assertEquals(config.getDefaultNamespaceBundleSplitAlgorithm(), "topic_count_equally_divide"); assertEquals(config.getSupportedNamespaceBundleSplitAlgorithms().size(), 1); assertEquals(config.getMaxMessagePublishBufferSizeInMB(), -1); + assertEquals(config.getManagedLedgerDataReadPriority(), "bookkeeper-first"); } @Test diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf index 08f7a9c4b0543..a21d92d297c40 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf @@ -74,8 +74,9 @@ managedLedgerMaxEntriesPerLedger=50000 managedLedgerMinLedgerRolloverTimeMinutes=10 managedLedgerMaxLedgerRolloverTimeMinutes=240 managedLedgerCursorMaxEntriesPerLedger=50000 -managedLedgerCursorRolloverTimeInSeconds=14400 -loadBalancerEnabled=false +managedLedgerCursorRolloverTimeInSeconds = 14400 +managedLedgerDataReadPriority = bookkeeper-first +loadBalancerEnabled = false loadBalancerReportUpdateThresholdPercentage=10 loadBalancerReportUpdateMaxIntervalMinutes=15 loadBalancerHostUsageCheckIntervalMinutes=1 diff --git a/pulsar-broker/src/test/resources/logback.xml b/pulsar-broker/src/test/resources/logback.xml index bf3daa8ece904..f5735b0b796ec 100644 --- a/pulsar-broker/src/test/resources/logback.xml +++ b/pulsar-broker/src/test/resources/logback.xml @@ -19,7 +19,7 @@ --> - diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 68793baf94f41..d74184f09566b 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -544,11 +544,12 @@ public void namespaces() throws Exception { namespaces.run(split("clear-offload-deletion-lag myprop/clust/ns1")); verify(mockNamespaces).clearOffloadDeleteLag("myprop/clust/ns1"); - namespaces.run(split("set-offload-policies myprop/clust/ns1 -r test-region -d aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M -oat 10M -oae 10s")); + namespaces.run(split( + "set-offload-policies myprop/clust/ns1 -r test-region -d aws-s3 -b test-bucket -e http://test.endpoint -mbs 32M -rbs 5M -oat 10M -oae 10s -orp tiered-storage-first")); verify(mockNamespaces).setOffloadPolicies("myprop/clust/ns1", OffloadPolicies.create("aws-s3", "test-region", "test-bucket", "http://test.endpoint", null, null, 32 * 1024 * 1024, 5 * 1024 * 1024, - 10 * 1024 * 1024L, 10000L)); + 10 * 1024 * 1024L, 10000L, OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST)); namespaces.run(split("remove-offload-policies myprop/clust/ns1")); verify(mockNamespaces).removeOffloadPolicies("myprop/clust/ns1"); @@ -763,9 +764,10 @@ public void topics() throws Exception { cmdTopics.run(split("remove-offload-policies persistent://myprop/clust/ns1/ds1")); verify(mockTopics).removeOffloadPolicies("persistent://myprop/clust/ns1/ds1"); - cmdTopics.run(split("set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r region -b bucket -e endpoint -m 8 -rb 9 -t 10")); + cmdTopics.run(split( + "set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r region -b bucket -e endpoint -m 8 -rb 9 -t 10 -orp tiered-storage-first")); OffloadPolicies offloadPolicies = OffloadPolicies.create("s3", "region", "bucket" - , "endpoint", null, null, 8, 9, 10L, null); + , "endpoint", null, null, 8, 9, 10L, null, OffloadPolicies.OffloadedReadPriority.TIERED_STORAGE_FIRST); verify(mockTopics).setOffloadPolicies("persistent://myprop/clust/ns1/ds1", offloadPolicies); cmdTopics.run(split("get-max-unacked-messages-on-consumer persistent://myprop/clust/ns1/ds1")); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index d7555713b2631..2065cd545c6a8 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -23,15 +23,15 @@ import com.beust.jcommander.Parameters; import com.beust.jcommander.converters.CommaParameterSplitter; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; - import java.io.IOException; import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.concurrent.TimeUnit; - +import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.admin.cli.utils.IOUtils; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -42,11 +42,12 @@ import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BookieAffinityGroupData; import org.apache.pulsar.common.policies.data.BundlesData; -import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; +import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.OffloadPolicies; +import org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; @@ -1650,7 +1651,7 @@ private class SetOffloadPolicies extends CliCommand { @Parameter( names = {"--driver", "-d"}, description = "Driver to use to offload old data to long term storage, " + - "(Possible values: S3, aws-s3, google-cloud-storage)", + "(Possible values: S3, aws-s3, google-cloud-storage, filesystem, azureblob)", required = true) private String driver; @@ -1710,17 +1711,24 @@ private class SetOffloadPolicies extends CliCommand { required = false) private String offloadAfterThresholdStr; - private final String[] DRIVER_NAMES = {"S3", "aws-s3", "google-cloud-storage"}; + @Parameter( + names = {"--offloadedReadPriority", "-orp"}, + description = "read priority for offloaded messages", + required = false + ) + private String offloadReadPriorityStr; + + public final ImmutableList DRIVER_NAMES = OffloadPolicies.DRIVER_NAMES; public boolean driverSupported(String driver) { - return Arrays.stream(DRIVER_NAMES).anyMatch(d -> d.equalsIgnoreCase(driver)); + return DRIVER_NAMES.stream().anyMatch(d -> d.equalsIgnoreCase(driver)); } public boolean isS3Driver(String driver) { if (StringUtils.isEmpty(driver)) { return false; } - return driver.equalsIgnoreCase(DRIVER_NAMES[0]) || driver.equalsIgnoreCase(DRIVER_NAMES[1]); + return driver.equalsIgnoreCase(DRIVER_NAMES.get(0)) || driver.equalsIgnoreCase(DRIVER_NAMES.get(1)); } public boolean positiveCheck(String paramName, long value) { @@ -1744,7 +1752,7 @@ void run() throws PulsarAdminException { if (!driverSupported(driver)) { throw new ParameterException( "The driver " + driver + " is not supported, " + - "(Possible values: S3, aws-s3, google-cloud-storage)."); + "(Possible values: " + String.join(",", DRIVER_NAMES) + ")."); } if (isS3Driver(driver) && Strings.isNullOrEmpty(region) && Strings.isNullOrEmpty(endpoint)) { @@ -1788,10 +1796,24 @@ && maxValueCheck("OffloadAfterThreshold", offloadAfterThreshold, Long.MAX_VALUE) offloadAfterThresholdInBytes = offloadAfterThreshold; } } + OffloadedReadPriority offloadedReadPriority = OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY; + + if (this.offloadReadPriorityStr != null) { + try { + offloadedReadPriority = OffloadedReadPriority.fromString(this.offloadReadPriorityStr); + } catch (Exception e) { + throw new ParameterException("--offloadedReadPriority parameter must be one of " + + Arrays.stream(OffloadedReadPriority.values()) + .map(OffloadedReadPriority::toString) + .collect(Collectors.joining(",")) + + " but got: " + this.offloadReadPriorityStr, e); + } + } OffloadPolicies offloadPolicies = OffloadPolicies.create(driver, region, bucket, endpoint, awsId, awsSecret, maxBlockSizeInBytes, readBufferSizeInBytes, offloadAfterThresholdInBytes, - offloadAfterElapsedInMillis); + offloadAfterElapsedInMillis, offloadedReadPriority); + admin.namespaces().setOffloadPolicies(namespace, offloadPolicies); } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 8f55e3d1c250a..8488e171c6303 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -49,6 +50,7 @@ import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; import org.apache.pulsar.common.policies.data.OffloadPolicies; +import org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.PublishRate; @@ -1336,11 +1338,34 @@ private class SetOffloadPolicies extends CliCommand { , description = "ManagedLedger offload deletion lag in bytes") private Long offloadDeletionLagInMillis; + @Parameter( + names = {"--offloadedReadPriority", "-orp"}, + description = "read priority for offloaded messages", + required = false + ) + private String offloadReadPriorityStr; + @Override void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(params); + + OffloadedReadPriority offloadedReadPriority = OffloadPolicies.DEFAULT_OFFLOADED_READ_PRIORITY; + + if (this.offloadReadPriorityStr != null) { + try { + offloadedReadPriority = OffloadedReadPriority.fromString(this.offloadReadPriorityStr); + } catch (Exception e) { + throw new ParameterException("--offloadedReadPriority parameter must be one of " + + Arrays.stream(OffloadedReadPriority.values()) + .map(OffloadedReadPriority::toString) + .collect(Collectors.joining(",")) + + " but got: " + this.offloadReadPriorityStr, e); + } + } + OffloadPolicies offloadPolicies = OffloadPolicies.create(driver, region, bucket, endpoint, awsId, awsSecret, maxBlockSizeInBytes - , readBufferSizeInBytes, offloadThresholdInBytes, offloadDeletionLagInMillis); + , readBufferSizeInBytes, offloadThresholdInBytes, offloadDeletionLagInMillis, offloadedReadPriority); + admin.topics().setOffloadPolicies(persistentTopic, offloadPolicies); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java index da9db187517dd..6620081038534 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java @@ -20,6 +20,7 @@ import static org.apache.pulsar.common.util.FieldParser.value; import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableList; import java.io.Serializable; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; @@ -31,9 +32,12 @@ import java.util.List; import java.util.Objects; import java.util.Properties; +import java.util.stream.Collectors; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.common.classification.InterfaceAudience; +import org.apache.pulsar.common.classification.InterfaceStability; /** * Definition of the offload policies. @@ -42,9 +46,58 @@ @Data public class OffloadPolicies implements Serializable { + @InterfaceAudience.Public + @InterfaceStability.Stable + public enum OffloadedReadPriority { + /** + * For offloaded messages, readers will try to read from bookkeeper at first, + * if messages not exist at bookkeeper then read from offloaded storage. + */ + BOOKKEEPER_FIRST("bookkeeper-first"), + /** + * For offloaded messages, readers will try to read from offloaded storage first, + * even they are still exist in bookkeeper. + */ + TIERED_STORAGE_FIRST("tiered-storage-first"); + + private final String value; + + OffloadedReadPriority(String value) { + this.value = value; + } + + public boolean equalsName(String otherName) { + return value.equals(otherName); + } + + @Override + public String toString() { + return value; + } + + public static OffloadedReadPriority fromString(String str) { + for (OffloadedReadPriority value : OffloadedReadPriority.values()) { + if (value.value.equals(str)) { + return value; + } + } + + throw new IllegalArgumentException("--offloadedReadPriority parameter must be one of " + + Arrays.stream(OffloadedReadPriority.values()) + .map(OffloadedReadPriority::toString) + .collect(Collectors.joining(",")) + + " but got: " + str); + } + + public String getValue() { + return value; + } + } + private final static long serialVersionUID = 0L; private final static List CONFIGURATION_FIELDS; + static { CONFIGURATION_FIELDS = new ArrayList<>(); Class clazz = OffloadPolicies.class; @@ -60,9 +113,8 @@ public class OffloadPolicies implements Serializable { public final static int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024; // 1MB public final static int DEFAULT_OFFLOAD_MAX_THREADS = 2; public final static int DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS = 1; - final static String[] DRIVER_NAMES = { - "S3", "aws-s3", "google-cloud-storage", "filesystem", "azureblob", "aliyun-oss" - }; + public final static ImmutableList DRIVER_NAMES = ImmutableList + .of("S3", "aws-s3", "google-cloud-storage", "filesystem", "azureblob", "aliyun-oss"); public final static String DEFAULT_OFFLOADER_DIRECTORY = "./offloaders"; public final static Long DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES = null; public final static Long DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS = null; @@ -70,6 +122,7 @@ public class OffloadPolicies implements Serializable { public final static String OFFLOAD_THRESHOLD_NAME_IN_CONF_FILE = "managedLedgerOffloadAutoTriggerSizeThresholdBytes"; public final static String DELETION_LAG_NAME_IN_CONF_FILE = "managedLedgerOffloadDeletionLagMs"; + public final static OffloadedReadPriority DEFAULT_OFFLOADED_READ_PRIORITY = OffloadedReadPriority.TIERED_STORAGE_FIRST; // common config @Configuration @@ -84,6 +137,8 @@ public class OffloadPolicies implements Serializable { private Long managedLedgerOffloadThresholdInBytes = DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES; @Configuration private Long managedLedgerOffloadDeletionLagInMillis = DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS; + @Configuration + private OffloadedReadPriority managedLedgerOffloadedReadPriority = DEFAULT_OFFLOADED_READ_PRIORITY; // s3 config, set by service configuration or cli @Configuration @@ -137,7 +192,8 @@ public class OffloadPolicies implements Serializable { public static OffloadPolicies create(String driver, String region, String bucket, String endpoint, String credentialId, String credentialSecret, Integer maxBlockSizeInBytes, Integer readBufferSizeInBytes, - Long offloadThresholdInBytes, Long offloadDeletionLagInMillis) { + Long offloadThresholdInBytes, Long offloadDeletionLagInMillis, + OffloadedReadPriority readPriority) { OffloadPolicies offloadPolicies = new OffloadPolicies(); offloadPolicies.setManagedLedgerOffloadDriver(driver); offloadPolicies.setManagedLedgerOffloadThresholdInBytes(offloadThresholdInBytes); @@ -148,8 +204,9 @@ public static OffloadPolicies create(String driver, String region, String bucket offloadPolicies.setManagedLedgerOffloadServiceEndpoint(endpoint); offloadPolicies.setManagedLedgerOffloadMaxBlockSizeInBytes(maxBlockSizeInBytes); offloadPolicies.setManagedLedgerOffloadReadBufferSizeInBytes(readBufferSizeInBytes); + offloadPolicies.setManagedLedgerOffloadedReadPriority(readPriority); - if (driver.equalsIgnoreCase(DRIVER_NAMES[0]) || driver.equalsIgnoreCase(DRIVER_NAMES[1])) { + if (driver.equalsIgnoreCase(DRIVER_NAMES.get(0)) || driver.equalsIgnoreCase(DRIVER_NAMES.get(1))) { if (credentialId != null) { offloadPolicies.setS3ManagedLedgerOffloadRole(credentialId); } @@ -161,7 +218,7 @@ public static OffloadPolicies create(String driver, String region, String bucket offloadPolicies.setS3ManagedLedgerOffloadServiceEndpoint(endpoint); offloadPolicies.setS3ManagedLedgerOffloadMaxBlockSizeInBytes(maxBlockSizeInBytes); offloadPolicies.setS3ManagedLedgerOffloadReadBufferSizeInBytes(readBufferSizeInBytes); - } else if (driver.equalsIgnoreCase(DRIVER_NAMES[2])) { + } else if (driver.equalsIgnoreCase(DRIVER_NAMES.get(2))) { offloadPolicies.setGcsManagedLedgerOffloadRegion(region); offloadPolicies.setGcsManagedLedgerOffloadBucket(bucket); offloadPolicies.setGcsManagedLedgerOffloadMaxBlockSizeInBytes(maxBlockSizeInBytes); @@ -204,7 +261,7 @@ private void compatibleWithBrokerConfigFile(Properties properties) { } public boolean driverSupported() { - return Arrays.stream(DRIVER_NAMES).anyMatch(d -> d.equalsIgnoreCase(this.managedLedgerOffloadDriver)); + return DRIVER_NAMES.stream().anyMatch(d -> d.equalsIgnoreCase(this.managedLedgerOffloadDriver)); } public static String getSupportedDriverNames() { @@ -215,22 +272,22 @@ public boolean isS3Driver() { if (managedLedgerOffloadDriver == null) { return false; } - return managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES[0]) - || managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES[1]); + return managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES.get(0)) + || managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES.get(1)); } public boolean isGcsDriver() { if (managedLedgerOffloadDriver == null) { return false; } - return managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES[2]); + return managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES.get(2)); } public boolean isFileSystemDriver() { if (managedLedgerOffloadDriver == null) { return false; } - return managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES[3]); + return managedLedgerOffloadDriver.equalsIgnoreCase(DRIVER_NAMES.get(3)); } public boolean bucketValid() { @@ -253,6 +310,7 @@ public boolean bucketValid() { @Override public int hashCode() { return Objects.hash( + managedLedgerOffloadedReadPriority, managedLedgerOffloadDriver, managedLedgerOffloadMaxThreads, managedLedgerOffloadPrefetchRounds, @@ -288,17 +346,18 @@ public boolean equals(Object obj) { return false; } OffloadPolicies other = (OffloadPolicies) obj; - return Objects.equals(managedLedgerOffloadDriver, other.getManagedLedgerOffloadDriver()) + return Objects.equals(managedLedgerOffloadedReadPriority, other.getManagedLedgerOffloadedReadPriority()) + && Objects.equals(managedLedgerOffloadDriver, other.getManagedLedgerOffloadDriver()) && Objects.equals(managedLedgerOffloadMaxThreads, other.getManagedLedgerOffloadMaxThreads()) && Objects.equals(managedLedgerOffloadPrefetchRounds, other.getManagedLedgerOffloadPrefetchRounds()) && Objects.equals(managedLedgerOffloadThresholdInBytes, - other.getManagedLedgerOffloadThresholdInBytes()) + other.getManagedLedgerOffloadThresholdInBytes()) && Objects.equals(managedLedgerOffloadDeletionLagInMillis, - other.getManagedLedgerOffloadDeletionLagInMillis()) + other.getManagedLedgerOffloadDeletionLagInMillis()) && Objects.equals(s3ManagedLedgerOffloadRegion, other.getS3ManagedLedgerOffloadRegion()) && Objects.equals(s3ManagedLedgerOffloadBucket, other.getS3ManagedLedgerOffloadBucket()) && Objects.equals(s3ManagedLedgerOffloadServiceEndpoint, - other.getS3ManagedLedgerOffloadServiceEndpoint()) + other.getS3ManagedLedgerOffloadServiceEndpoint()) && Objects.equals(s3ManagedLedgerOffloadMaxBlockSizeInBytes, other.getS3ManagedLedgerOffloadMaxBlockSizeInBytes()) && Objects.equals(s3ManagedLedgerOffloadReadBufferSizeInBytes, @@ -328,6 +387,7 @@ public boolean equals(Object obj) { @Override public String toString() { return MoreObjects.toStringHelper(this) + .add("managedLedgerOffloadedReadPriority", managedLedgerOffloadedReadPriority) .add("managedLedgerOffloadDriver", managedLedgerOffloadDriver) .add("managedLedgerOffloadMaxThreads", managedLedgerOffloadMaxThreads) .add("managedLedgerOffloadPrefetchRounds", managedLedgerOffloadPrefetchRounds) @@ -358,7 +418,7 @@ public String toString() { public Properties toProperties() { Properties properties = new Properties(); - + setProperty(properties, "managedLedgerOffloadedReadPriority", this.getManagedLedgerOffloadedReadPriority()); setProperty(properties, "offloadersDirectory", this.getOffloadersDirectory()); setProperty(properties, "managedLedgerOffloadDriver", this.getManagedLedgerOffloadDriver()); setProperty(properties, "managedLedgerOffloadMaxThreads", diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java index d87887dfe9d02..a89409c0ec526 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.common.policies.data; import java.util.Properties; +import org.apache.pulsar.common.policies.data.OffloadPolicies.OffloadedReadPriority; import org.testng.Assert; import org.testng.annotations.Test; @@ -59,7 +60,8 @@ public void testS3Configuration() { maxBlockSizeInBytes, readBufferSizeInBytes, offloadThresholdInBytes, - offloadDeletionLagInMillis + offloadDeletionLagInMillis, + OffloadedReadPriority.TIERED_STORAGE_FIRST ); Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDriver(), driver); @@ -86,6 +88,7 @@ public void testGcsConfiguration() { final Integer readBufferSizeInBytes = 2 * M; final Long offloadThresholdInBytes = 0L; final Long offloadDeletionLagInMillis = 5 * MIN; + final OffloadedReadPriority readPriority = OffloadedReadPriority.TIERED_STORAGE_FIRST; OffloadPolicies offloadPolicies = OffloadPolicies.create( driver, @@ -97,7 +100,8 @@ public void testGcsConfiguration() { maxBlockSizeInBytes, readBufferSizeInBytes, offloadThresholdInBytes, - offloadDeletionLagInMillis + offloadDeletionLagInMillis, + readPriority ); Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDriver(), driver); @@ -107,6 +111,7 @@ public void testGcsConfiguration() { Assert.assertEquals(offloadPolicies.getGcsManagedLedgerOffloadReadBufferSizeInBytes(), readBufferSizeInBytes); Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(), offloadThresholdInBytes); Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(), offloadDeletionLagInMillis); + Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority(), readPriority); } @Test diff --git a/site2/docs/cookbooks-tiered-storage.md b/site2/docs/cookbooks-tiered-storage.md index 6dd2803363554..0263ec2596250 100644 --- a/site2/docs/cookbooks-tiered-storage.md +++ b/site2/docs/cookbooks-tiered-storage.md @@ -262,6 +262,16 @@ $ bin/pulsar-admin namespaces set-offload-threshold --size 10M my-tenant/my-name > Automatic offload runs when a new segment is added to a topic log. If you set the threshold on a namespace, but few messages are being produced to the topic, offload will not until the current segment is full. +## Configuring read priority for offloaded messages + +By default, once messages were offloaded to long term storage, brokers will read them from long term storage, but messages still exists in bookkeeper for a period depends on the administrator's configuration. For +messages exists in both bookkeeper and long term storage, if they are preferred to read from bookkeeper, you can use command to change this configuration. + +```bash +# default value for -orp is tiered-storage-first +$ bin/pulsar-admin namespaces set-offload-policies my-tenant/my-namespace -orp bookkeeper-first +$ bin/pulsar-admin topics set-offload-policies my-tenant/my-namespace/topic1 -orp bookkeeper-first +``` ## Triggering offload manually