Skip to content

Commit

Permalink
Renamed DestinationName into TopicName (apache#1280)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Feb 26, 2018
1 parent 1a1a6e3 commit 5fc4d53
Show file tree
Hide file tree
Showing 124 changed files with 2,172 additions and 2,182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.slf4j.Logger;
Expand Down Expand Up @@ -100,20 +100,20 @@ private long getNumberOfEntries(Range<PositionImpl> range,

public PersistentOfflineTopicStats getEstimatedUnloadedTopicBacklog(ManagedLedgerFactoryImpl factory,
String managedLedgerName) throws Exception {
return estimateUnloadedTopicBacklog(factory, DestinationName.get("persistent://" + managedLedgerName));
return estimateUnloadedTopicBacklog(factory, TopicName.get("persistent://" + managedLedgerName));
}

public PersistentOfflineTopicStats estimateUnloadedTopicBacklog(ManagedLedgerFactoryImpl factory,
DestinationName dn) throws Exception {
String managedLedgerName = dn.getPersistenceNamingEncoding();
TopicName topicName) throws Exception {
String managedLedgerName = topicName.getPersistenceNamingEncoding();
long numberOfEntries = 0;
long totalSize = 0;
final NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers = new ConcurrentSkipListMap<>();
final PersistentOfflineTopicStats offlineTopicStats = new PersistentOfflineTopicStats(managedLedgerName,
brokerName);

// calculate total managed ledger size and number of entries without loading the topic
readLedgerMeta(factory, dn, ledgers);
readLedgerMeta(factory, topicName, ledgers);
for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : ledgers.values()) {
numberOfEntries += ls.getEntries();
totalSize += ls.getSize();
Expand All @@ -128,15 +128,15 @@ public PersistentOfflineTopicStats estimateUnloadedTopicBacklog(ManagedLedgerFac
}

// calculate per cursor message backlog
calculateCursorBacklogs(factory, dn, ledgers, offlineTopicStats);
calculateCursorBacklogs(factory, topicName, ledgers, offlineTopicStats);
offlineTopicStats.statGeneratedAt.setTime(System.currentTimeMillis());

return offlineTopicStats;
}

private void readLedgerMeta(final ManagedLedgerFactoryImpl factory, final DestinationName dn,
private void readLedgerMeta(final ManagedLedgerFactoryImpl factory, final TopicName topicName,
final NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers) throws Exception {
String managedLedgerName = dn.getPersistenceNamingEncoding();
String managedLedgerName = topicName.getPersistenceNamingEncoding();
MetaStore store = factory.getMetaStore();
BookKeeper bk = factory.getBookKeeper();
final CountDownLatch mlMetaCounter = new CountDownLatch(1);
Expand Down Expand Up @@ -206,14 +206,14 @@ public void operationFailed(ManagedLedgerException.MetaStoreException e) {
}
}

private void calculateCursorBacklogs(final ManagedLedgerFactoryImpl factory, final DestinationName dn,
private void calculateCursorBacklogs(final ManagedLedgerFactoryImpl factory, final TopicName topicName,
final NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers,
final PersistentOfflineTopicStats offlineTopicStats) throws Exception {

if (ledgers.size() == 0) {
return;
}
String managedLedgerName = dn.getPersistenceNamingEncoding();
String managedLedgerName = topicName.getPersistenceNamingEncoding();
MetaStore store = factory.getMetaStore();
BookKeeper bk = factory.getBookKeeper();
final CountDownLatch allCursorsCounter = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction;

Expand All @@ -48,47 +48,46 @@ public interface AuthorizationProvider extends Closeable {
void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException;

/**
* Check if the specified role has permission to send messages to the specified fully qualified destination name.
* Check if the specified role has permission to send messages to the specified fully qualified topic name.
*
* @param destination
* the fully qualified destination name associated with the destination.
* @param topicName
* the fully qualified topic name associated with the topic.
* @param role
* the app id used to send messages to the destination.
* the app id used to send messages to the topic.
*/
CompletableFuture<Boolean> canProduceAsync(DestinationName destination, String role,
CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String role,
AuthenticationDataSource authenticationData);

/**
* Check if the specified role has permission to receive messages from the specified fully qualified destination
* name.
* Check if the specified role has permission to receive messages from the specified fully qualified topic name.
*
* @param destination
* the fully qualified destination name associated with the destination.
* @param topicName
* the fully qualified topic name associated with the topic.
* @param role
* the app id used to receive messages from the destination.
* the app id used to receive messages from the topic.
* @param subscription
* the subscription name defined by the client
*/
CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role,
CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role,
AuthenticationDataSource authenticationData, String subscription);

/**
* Check whether the specified role can perform a lookup for the specified destination.
* Check whether the specified role can perform a lookup for the specified topic.
*
* For that the caller needs to have producer or consumer permission.
*
* @param destination
* @param topicName
* @param role
* @return
* @throws Exception
*/
CompletableFuture<Boolean> canLookupAsync(DestinationName destination, String role,
CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String role,
AuthenticationDataSource authenticationData);

/**
*
*
* Grant authorization-action permission on a namespace to the given client
*
*
* @param namespace
* @param actions
* @param role
Expand All @@ -104,8 +103,8 @@ CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAc

/**
* Grant authorization-action permission on a topic to the given client
*
* @param topicname
*
* @param topicName
* @param role
* @param authDataJson
* additional authdata in json format
Expand All @@ -114,7 +113,7 @@ CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAc
* IllegalArgumentException when namespace not found<br/>
* IllegalStateException when failed to grant permission
*/
CompletableFuture<Void> grantPermissionAsync(DestinationName topicname, Set<AuthAction> actions, String role,
CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions, String role,
String authDataJson);

}
Loading

0 comments on commit 5fc4d53

Please sign in to comment.