Skip to content

Commit

Permalink
add stats entry for active consumer name and endpoint to get leader o…
Browse files Browse the repository at this point in the history
…f functions cluster (apache#2234)

* add stats entry for active consumer name and endpoint to get leader of functions cluster

* fixing bug in stats

* removing line
  • Loading branch information
jerrypeng authored and merlimat committed Jul 26, 2018
1 parent 8e62a46 commit 10ec02e
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,20 @@ public Response getCluster() {
return functions.getCluster();
}

@GET
@ApiOperation(
value = "Fetches info about the leader node of the Pulsar cluster running Pulsar Functions",
response = WorkerInfo.class
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")

})
@Path("/cluster/leader")
public WorkerInfo getClusterLeader() {
return functions.getClusterLeader();
}

@GET
@ApiOperation(
value = "Fetches information about which Pulsar Functions are assigned to which Pulsar clusters",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,12 @@ public SubscriptionStats getStats() {
}

subStats.type = getType();
if (dispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
Consumer activeConsumer = ((PersistentDispatcherSingleActiveConsumer) dispatcher).getActiveConsumer();
if (activeConsumer != null) {
subStats.activeConsumerName = activeConsumer.consumerName();
}
}
if (SubType.Shared.equals(subStats.type)) {
if (dispatcher instanceof PersistentDispatcherMultipleConsumers) {
subStats.unackedMessages = ((PersistentDispatcherMultipleConsumers) dispatcher)
Expand All @@ -640,6 +646,7 @@ public SubscriptionStats getStats() {
}
subStats.msgBacklog = getNumberOfEntriesInBacklog();
subStats.msgRateExpired = expiryMonitor.getMessageExpiryRate();

return subStats;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@ public class SubscriptionStats {
/** Number of unacknowledged messages for the subscription */
public long unackedMessages;

/** whether this subscription is Exclusive or Shared or Failover */
/** Whether this subscription is Exclusive or Shared or Failover */
public SubType type;

/** The name of the consumer that is active for single active consumer subscriptions i.e. failover or exclusive */
public String activeConsumerName;

/** Total rate of messages expired on this subscription. msg/s */
public double msgRateExpired;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,31 @@ public List<WorkerInfo> getCurrentMembership() {
return workerIds;
}

public WorkerInfo getLeader() {
TopicStats topicStats = null;
PulsarAdmin pulsarAdmin = this.getPulsarAdminClient();
try {
topicStats = pulsarAdmin.topics().getStats(this.workerConfig.getClusterCoordinationTopic());
} catch (PulsarAdminException e) {
log.error("Failed to get status of coordinate topic {}",
this.workerConfig.getClusterCoordinationTopic(), e);
throw new RuntimeException(e);
}

String activeConsumerName = topicStats.subscriptions.get(COORDINATION_TOPIC_SUBSCRIPTION).activeConsumerName;
WorkerInfo leader = null;
for (ConsumerStats consumerStats : topicStats.subscriptions
.get(COORDINATION_TOPIC_SUBSCRIPTION).consumers) {
if (consumerStats.consumerName.equals(activeConsumerName)) {
leader = WorkerInfo.parseFrom(consumerStats.metadata.get(WORKER_IDENTIFIER));
}
}
if (leader == null) {
log.warn("Failed to determine leader in functions cluster");
}
return leader;
}

@Override
public void close() throws PulsarClientException {
consumer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,24 @@ public Response getCluster() {
return Response.status(Status.OK).entity(new Gson().toJson(members)).build();
}

public WorkerInfo getClusterLeader() {
if (!isWorkerServiceAvailable()) {
throw new WebApplicationException(
Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData("Function worker service is not avaialable")).build());
}

MembershipManager membershipManager = worker().getMembershipManager();
WorkerInfo leader = membershipManager.getLeader();

if (leader == null) {
throw new WebApplicationException(
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
.entity(new ErrorData("Leader cannot be determined")).build());}

return leader;
}

public Response getAssignments() {

if (!isWorkerServiceAvailable()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ public Response getCluster() {
return functions.getCluster();
}

@GET
@Path("/cluster/leader")
@Produces(MediaType.APPLICATION_JSON)
public WorkerInfo getClusterLeader() {
return functions.getClusterLeader();
}

@GET
@Path("/assignments")
public Response getAssignments() {
Expand Down

0 comments on commit 10ec02e

Please sign in to comment.