Skip to content

Commit

Permalink
Removed usage of sample/standalone/ns1 namespaces in standalone (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored May 3, 2022
1 parent 40d7169 commit ab2e6a8
Show file tree
Hide file tree
Showing 17 changed files with 63 additions and 103 deletions.
8 changes: 4 additions & 4 deletions bin/pulsar-managed-ledger-admin
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ mlPath : str
managed-ledger path
eg:
print-managed-ledger --zkServer localhost:2181 --managedLedgerPath sample/standalone/ns1/persistent/test
print-managed-ledger --zkServer localhost:2181 --managedLedgerPath public/default/persistent/test
'''
def printManagedLedgerCommand(zk, mlPath):
print(getManagedLedgerInfo(zk, mlPath))
Expand All @@ -213,7 +213,7 @@ cursorName : str
managed-cursor path
eg:
print-cursor --zkServer localhost:2181 --managedLedgerPath sample/standalone/ns1/persistent/test --cursorName s1
print-cursor --zkServer localhost:2181 --managedLedgerPath public/default/persistent/test --cursorName s1
'''
def printManagedCursorCommand(zk, mlPath, cursorName):
try:
Expand All @@ -236,7 +236,7 @@ mlPath : str
deleteLedgerIds : str
comma separated deleting ledger-ids (eg: 123,124)
eg:
delete-managed-ledger-ids --zkServer localhost:2181 --managedLedgerPath sample/standalone/ns1/persistent/test --ledgerIds 3
delete-managed-ledger-ids --zkServer localhost:2181 --managedLedgerPath public/default/persistent/test --ledgerIds 3
'''
def deleteMLLedgerIdsCommand(zk, mlPath, deleteLedgerIds):
try:
Expand Down Expand Up @@ -266,7 +266,7 @@ markDeletePosition: str
markDeletePosition combination of <ledgerId>:<entryId> (eg. 123:1)
eg:
update-mark-delete-cursor --zkServer localhost:2181 --managedLedgerPath sample/standalone/ns1/persistent/test --cursorName s1 --cursorMarkDelete 0:1
update-mark-delete-cursor --zkServer localhost:2181 --managedLedgerPath public/default/persistent/test --cursorName s1 --cursorMarkDelete 0:1
'''
def updateMarkDeleteOfCursorCommand(zk, mlPath, cursorName, markDeletePosition):
try:
Expand Down
24 changes: 12 additions & 12 deletions faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ There is regex subscription coming up in Pulsar 2.0. See [PIP-13](https://github
### Does Pulsar have, or plan to have, a concept of log compaction where only the latest message with the same key will be kept ?
Yes, see [PIP-14](https://github.com/apache/pulsar/wiki/PIP-14:-Topic-compaction) for more details.

### When I use an exclusive subscription to a partitioned topic, is the subscription attached to the "whole topic" or to a "topic partition"?
On a partitioned topic, you can use all the 3 supported subscription types (exclusive, failover, shared), same as with non partitioned topics.
### When I use an exclusive subscription to a partitioned topic, is the subscription attached to the "whole topic" or to a "topic partition"?
On a partitioned topic, you can use all the 3 supported subscription types (exclusive, failover, shared), same as with non partitioned topics.
The “subscription” concept is roughly similar to a “consumer-group” in Kafka. You can have multiple of them in the same topic, with different names.

If you use “exclusive”, a consumer will try to consume from all partitions, or fail if any partition is already being consumed.
Expand All @@ -105,7 +105,7 @@ The mode similar to Kafka is “failover” subscription. In this case, you have
### What is the proxy component?
It’s a component that was introduced recently. Essentially it’s a stateless proxy that speaks that Pulsar binary protocol. The motivation is to avoid (or overcome the impossibility) of direct connection between clients and brokers.

---
---

## Usage and Configuration
### Can I manually change the number of bundles after creating namespaces?
Expand All @@ -119,7 +119,7 @@ Yes, you can use the cli tool `bin/pulsar-admin persistent unsubscribe $TOPIC -s

### How are subscription modes set? Can I create new subscriptions over the WebSocket API?
Yes, you can set most of the producer/consumer configuration option in websocket, by passing them as HTTP query parameters like:
`ws://localhost:8080/ws/consumer/persistent/sample/standalone/ns1/my-topic/my-sub?subscriptionType=Shared`
`ws://localhost:8080/ws/consumer/persistent/public/default/my-topic/my-sub?subscriptionType=Shared`

see [the doc](http://pulsar.apache.org/docs/latest/clients/WebSocket/#RunningtheWebSocketservice-1fhsvp).

Expand Down Expand Up @@ -153,7 +153,7 @@ There is no currently "infinite" retention, other than setting to very high valu
The key is that you should use different subscriptions for each consumer. Each subscription is completely independent from others.

### The default when creating a consumer, is it to "tail" from "now" on the topic, or from the "last acknowledged" or something else?
So when you spin up a consumer, it will try to subscribe to the topic, if the subscription doesn't exist, a new one will be created, and it will be positioned at the end of the topic ("now").
So when you spin up a consumer, it will try to subscribe to the topic, if the subscription doesn't exist, a new one will be created, and it will be positioned at the end of the topic ("now").

Once you reconnect, the subscription will still be there and it will be positioned on the last acknowledged messages from the previous session.

Expand Down Expand Up @@ -190,16 +190,16 @@ What’s your use case for timeout on the `receiveAsync()`? Could that be achiev
### Why do we choose to use bookkeeper to store consumer offset instead of zookeeper? I mean what's the benefits?
ZooKeeper is a “consensus” system that while it exposes a key/value interface is not meant to support a large volume of writes per second.

ZK is not an “horizontally scalable” system, because every node receive every transaction and keeps the whole data set. Effectively, ZK is based on a single “log” that is replicated consistently across the participants.
ZK is not an “horizontally scalable” system, because every node receive every transaction and keeps the whole data set. Effectively, ZK is based on a single “log” that is replicated consistently across the participants.

The max throughput we have observed on a well configured ZK on good hardware was around ~10K writes/s. If you want to do more than that, you would have to shard it..
The max throughput we have observed on a well configured ZK on good hardware was around ~10K writes/s. If you want to do more than that, you would have to shard it..

To store consumers cursor positions, we need to write potentially a large number of updates per second. Typically we persist the cursor every 1 second, though the rate is configurable and if you want to reduce the amount of potential duplicates, you can increase the persistent frequency.

With BookKeeper it’s very efficient to have a large throughput across a huge number of different “logs”. In our case, we use 1 log per cursor, and it becomes feasible to persist every single cursor update.

### I'm facing some issue using `.receiveAsync` that it seems to be related with `UnAckedMessageTracker` and `PartitionedConsumerImpl`. We are consuming messages with `receiveAsync`, doing instant `acknowledgeAsync` when message is received, after that the process will delay the next execution of itself. In such scenario we are consuming a lot more messages (repeated) than the num of messages produced. We are using Partitioned topics with setAckTimeout 30 seconds and I believe this issue could be related with `PartitionedConsumerImpl` because the same test in a non-partitioned topic does not generate any repeated message.
PartitionedConsumer is composed of a set of regular consumers, one per partition. To have a single `receive()` abstraction, messages from all partitions are then pushed into a shared queue.
PartitionedConsumer is composed of a set of regular consumers, one per partition. To have a single `receive()` abstraction, messages from all partitions are then pushed into a shared queue.

The thing is that the unacked message tracker works at the partition level.So when the timeout happens, it’s able to request redelivery for the messages and clear them from the queue when that happens,
but if the messages were already pushed into the shared queue, the “clearing” part will not happen.
Expand Down Expand Up @@ -229,8 +229,8 @@ A final option is to check the topic stats. This is a tiny bit involved, because
There’s not currently an option for “infinite” (though it sounds a good idea! maybe we could use `-1` for that). The only option now is to use INT_MAX for `retentionTimeInMinutes` and LONG_MAX for `retentionSizeInMB`. It’s not “infinite” but 4085 years of retention should probably be enough!

### Is there a profiling option in Pulsar, so that we can breakdown the time costed in every stage? For instance, message A stay in queue 1ms, bk writing time 2ms(interval between sending to bk and receiving ack from bk) and so on.
There are latency stats at different stages. In the client (eg: reported every 1min in info logs).
In the broker: accessible through the broker metrics, and finally in bookies where there are several different latency metrics.
There are latency stats at different stages. In the client (eg: reported every 1min in info logs).
In the broker: accessible through the broker metrics, and finally in bookies where there are several different latency metrics.

In broker there’s just the write latency on BK, because there is no other queuing involved in the write path.

Expand All @@ -242,7 +242,7 @@ you can create reader with `MessageId.earliest`
yes, broker performs auth&auth while creating producer/consumer and this information presents under namespace policies.. so, if auth is enabled then broker does validation

### From what I’ve seen so far, it seems that I’d instead want to do a partitioned topic when I want a firehose/mix of data, and shuffle that firehose in to specific topics per entity when I’d have more discrete consumers. Is that accurate?
Precisely, you can use either approach, and even combine them, depending on what is more convenient for the use case. The general traits to choose one or the other are:
Precisely, you can use either approach, and even combine them, depending on what is more convenient for the use case. The general traits to choose one or the other are:

- Partitions -> Maintain a single “logical” topic but scale throughput to multiple machines. Also, ability to consume in order for a “partition” of the keys. In general, consumers are assigned a partition (and thus a subset of keys) without specifying anything.

Expand All @@ -258,7 +258,7 @@ Main difference: a reader can be used when manually managing the offset/messageI


### Hey, question on routing mode for partitioned topics. What is the default configuration and what is used in the Kafka adaptor?
The default is to use the hash of the key on a message. If the message has no key, the producer will use a “default” partition (picks 1 random partition and use it for all the messages it publishes).
The default is to use the hash of the key on a message. If the message has no key, the producer will use a “default” partition (picks 1 random partition and use it for all the messages it publishes).

This is to maintain the same ordering guarantee when no partitions are there: per-producer ordering.

Expand Down
79 changes: 21 additions & 58 deletions pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,20 @@
import com.google.common.collect.Sets;
import java.io.File;
import java.nio.file.Paths;
import java.util.List;
import java.util.Collections;
import java.util.Optional;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.logging.log4j.LogManager;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.TenantResources;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.service.WorkerServiceLoader;
Expand Down Expand Up @@ -301,18 +301,11 @@ public void start() throws Exception {

admin = broker.getAdminClient();

ClusterData clusterData = ClusterData.builder()
.serviceUrl(broker.getWebServiceAddress())
.serviceUrlTls(broker.getWebServiceAddressTls())
.brokerServiceUrl(broker.getBrokerServiceUrl())
.brokerServiceUrlTls(broker.getBrokerServiceUrlTls())
.build();
createSampleNameSpace(clusterData, cluster);

//create default namespace
createNameSpace(cluster, TopicName.PUBLIC_TENANT, TopicName.PUBLIC_TENANT + "/" + TopicName.DEFAULT_NAMESPACE);
createNameSpace(cluster, TopicName.PUBLIC_TENANT,
NamespaceName.get(TopicName.PUBLIC_TENANT, TopicName.DEFAULT_NAMESPACE));
//create pulsar system namespace
createNameSpace(cluster, SYSTEM_NAMESPACE.getTenant(), SYSTEM_NAMESPACE.toString());
createNameSpace(cluster, SYSTEM_NAMESPACE.getTenant(), SYSTEM_NAMESPACE);
if (config.isTransactionCoordinatorEnabled()) {
NamespaceResources.PartitionedTopicResources partitionedTopicResources =
broker.getPulsarResources().getNamespaceResources().getPartitionedTopicResources();
Expand All @@ -327,52 +320,22 @@ public void start() throws Exception {
log.debug("--- setup completed ---");
}

private void createNameSpace(String cluster, String publicTenant, String defaultNamespace) {
try {
if (!admin.tenants().getTenants().contains(publicTenant)) {
admin.tenants().createTenant(publicTenant,
TenantInfo.builder()
.adminRoles(Sets.newHashSet(config.getSuperUserRoles()))
.allowedClusters(Sets.newHashSet(cluster))
.build());
}
if (!admin.namespaces().getNamespaces(publicTenant).contains(defaultNamespace)) {
admin.namespaces().createNamespace(defaultNamespace);
admin.namespaces().setNamespaceReplicationClusters(
defaultNamespace, Sets.newHashSet(config.getClusterName()));
}
} catch (PulsarAdminException e) {
log.info(e.getMessage(), e);
}
}
private void createNameSpace(String cluster, String publicTenant, NamespaceName ns) throws Exception {
TenantResources tr = broker.getPulsarResources().getTenantResources();
NamespaceResources nsr = broker.getPulsarResources().getNamespaceResources();

private void createSampleNameSpace(ClusterData clusterData, String cluster) {
// Create a sample namespace
final String tenant = "sample";
final String globalCluster = "global";
final String namespace = tenant + "/ns1";
try {
List<String> clusters = admin.clusters().getClusters();
if (!clusters.contains(cluster)) {
admin.clusters().createCluster(cluster, clusterData);
} else {
admin.clusters().updateCluster(cluster, clusterData);
}
// Create marker for "global" cluster
if (!clusters.contains(globalCluster)) {
admin.clusters().createCluster(globalCluster, ClusterData.builder().build());
}

if (!admin.tenants().getTenants().contains(tenant)) {
admin.tenants().createTenant(tenant,
new TenantInfoImpl(Sets.newHashSet(config.getSuperUserRoles()), Sets.newHashSet(cluster)));
}
if (!tr.tenantExists(publicTenant)) {
tr.createTenant(publicTenant,
TenantInfo.builder()
.adminRoles(Sets.newHashSet(config.getSuperUserRoles()))
.allowedClusters(Sets.newHashSet(cluster))
.build());
}

if (!admin.namespaces().getNamespaces(tenant).contains(namespace)) {
admin.namespaces().createNamespace(namespace);
}
} catch (PulsarAdminException e) {
log.warn(e.getMessage(), e);
if (!nsr.namespaceExists(ns)) {
Policies nsp = new Policies();
nsp.replication_clusters = Collections.singleton(config.getClusterName());
nsr.createPolicies(ns, nsp);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1457,8 +1457,8 @@ public static Multimap<String, Metric> parseMetrics(String metrics) {
// Example of lines are
// jvm_threads_current{cluster="standalone",} 203.0
// or
// pulsar_subscriptions_count{cluster="standalone", namespace="sample/standalone/ns1",
// topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897
// pulsar_subscriptions_count{cluster="standalone", namespace="public/default",
// topic="persistent://public/default/test-2"} 0.0 1517945780897
Pattern pattern = Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s([+-]?[\\d\\w\\.-]+)(\\s(\\d+))?$");
Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ public static Map<String, Metric> parseMetrics(String metrics) {
// Example of lines are
// jvm_threads_current{cluster="standalone",} 203.0
// or
// pulsar_subscriptions_count{cluster="standalone", namespace="sample/standalone/ns1",
// topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897
// pulsar_subscriptions_count{cluster="standalone", namespace="public/default",
// topic="persistent://public/default/test-2"} 0.0 1517945780897
Pattern pattern = Pattern.compile("^(\\w+)(\\{[^\\}]+\\})?\\s([+-]?[\\d\\w\\.-]+)(\\s(\\d+))?$");
Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
Arrays.asList(metrics.split("\n")).forEach(line -> {
Expand Down
4 changes: 2 additions & 2 deletions pulsar-client-cpp/docs/MainPage.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ $ make
Client client("pulsar://localhost:6650");

Consumer consumer;
Result result = client.subscribe("persistent://sample/standalone/ns1/my-topic", "my-subscribtion-name", consumer);
Result result = client.subscribe("persistent://public/default/my-topic", "my-subscribtion-name", consumer);
if (result != ResultOk) {
LOG_ERROR("Failed to subscribe: " << result);
return -1;
Expand All @@ -136,7 +136,7 @@ client.close();
Client client("pulsar://localhost:6650");
Producer producer;
Result result = client.createProducer("persistent://sample/standalone/ns1/my-topic", producer);
Result result = client.createProducer("persistent://public/default/my-topic", producer);
if (result != ResultOk) {
LOG_ERROR("Error creating producer: " << result);
return -1;
Expand Down
8 changes: 3 additions & 5 deletions pulsar-client-cpp/pulsar-test-service-start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,10 @@ $PULSAR_DIR/bin/pulsar-admin clusters create \
--broker-url pulsar://localhost:6650/ \
--broker-url-secure pulsar+ssl://localhost:6651/

# Create "public" tenant
$PULSAR_DIR/bin/pulsar-admin tenants create public -r "anonymous" -c "standalone"
# Update "public" tenant
$PULSAR_DIR/bin/pulsar-admin tenants update public -r "anonymous" -c "standalone"

# Create "public/default" with no auth required
$PULSAR_DIR/bin/pulsar-admin namespaces create public/default \
--clusters standalone
# Update "public/default" with no auth required
$PULSAR_DIR/bin/pulsar-admin namespaces grant-permission public/default \
--actions produce,consume \
--role "anonymous"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
public class DefaultSchemasTest {
private PulsarClient client;

private static final String TEST_TOPIC = "persistent://sample/standalone/ns1/test-topic";
private static final String TEST_TOPIC = "test-topic";

@BeforeClass
public void setup() throws PulsarClientException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
@Slf4j
public class PulsarSinkTest {

private static final String TOPIC = "persistent://sample/standalone/ns1/test_result";
private static final String TOPIC = "test_result";

public static class TestSerDe implements SerDe<String> {

Expand Down
Loading

0 comments on commit ab2e6a8

Please sign in to comment.