Skip to content

Commit

Permalink
Log scheduler stats for Pulsar Functions (apache#7474)
Browse files Browse the repository at this point in the history
Co-authored-by: Jerry Peng <[email protected]>
  • Loading branch information
jerrypeng and Jerry Peng authored Jul 8, 2020
1 parent eaf268c commit cc2c203
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,6 @@ private void addAssignment(Assignment assignment) {
}

private void startFunctionInstance(Assignment assignment) {
log.info("infos: {}", functionRuntimeInfos.getAll());
String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
FunctionRuntimeInfo functionRuntimeInfo = _getFunctionRuntimeInfo(fullyQualifiedInstanceId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ public void checkFailures(FunctionMetaDataManager functionMetaDataManager,
// check unassigned
Collection<Function.Instance> needSchedule = new LinkedList<>();
Collection<Function.Assignment> needRemove = new LinkedList<>();
Map<String, Integer> numRemoved = new HashMap<>();
for (Map.Entry<Function.Instance, Long> entry : this.unsignedFunctionDurations.entrySet()) {
Function.Instance instance = entry.getKey();
long unassignedDurationMs = entry.getValue();
Expand All @@ -217,6 +218,12 @@ public void checkFailures(FunctionMetaDataManager functionMetaDataManager,
Function.Assignment assignment = assignmentMap.get(FunctionCommon.getFullyQualifiedInstanceId(instance));
if (assignment != null) {
needRemove.add(assignment);

Integer count = numRemoved.get(assignment.getWorkerId());
if (count == null) {
count = 0;
}
numRemoved.put(assignment.getWorkerId(), count + 1);
}
triggerScheduler = true;
}
Expand All @@ -225,7 +232,8 @@ public void checkFailures(FunctionMetaDataManager functionMetaDataManager,
functionRuntimeManager.removeAssignments(needRemove);
}
if (triggerScheduler) {
log.info("Functions that need scheduling/rescheduling: {}", needSchedule);
log.info("Failure check - Total number of instances that need to be scheduled/rescheduled: {} | Number of unassigned instances that need to be scheduled: {} | Number of instances on dead workers that need to be reassigned {}",
needSchedule.size(), needSchedule.size() - needRemove.size(), numRemoved);
schedulerManager.schedule();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
*/
package org.apache.pulsar.functions.worker;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.Builder;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -34,6 +38,7 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.Assignment;
Expand Down Expand Up @@ -239,7 +244,8 @@ public Future<?> rebalanceIfNotInprogress() {

@VisibleForTesting
void invokeScheduler() {

long startTime = System.nanoTime();

Set<String> currentMembership = membershipManager.getCurrentMembership()
.stream().map(workerInfo -> workerInfo.getWorkerId()).collect(Collectors.toSet());

Expand All @@ -248,11 +254,13 @@ void invokeScheduler() {
Map<String, Map<String, Assignment>> workerIdToAssignments = functionRuntimeManager
.getCurrentAssignments();

// initialize stats collection
SchedulerStats schedulerStats = new SchedulerStats(workerIdToAssignments, currentMembership);

//delete assignments of functions and instances that don't exist anymore
Iterator<Map.Entry<String, Map<String, Assignment>>> it = workerIdToAssignments.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, Map<String, Assignment>> workerIdToAssignmentEntry = it.next();
String workerId = workerIdToAssignmentEntry.getKey();
Map<String, Assignment> functionMap = workerIdToAssignmentEntry.getValue();

// remove instances that don't exist anymore
Expand All @@ -268,6 +276,8 @@ void invokeScheduler() {
functionRuntimeManager.deleteAssignment(fullyQualifiedInstanceId);
// update message id associated with current view of assignments map
lastMessageProduced = messageId;
// update stats
schedulerStats.removedAssignment(assignment);
}
return deleted;
});
Expand All @@ -288,6 +298,8 @@ void invokeScheduler() {
functionRuntimeManager.processAssignment(newAssignment);
// update message id associated with current view of assignments map
lastMessageProduced = messageId;
//update stats
schedulerStats.updatedAssignment(newAssignment);
}
if (functionMap.isEmpty()) {
it.remove();
Expand Down Expand Up @@ -331,16 +343,26 @@ void invokeScheduler() {
functionRuntimeManager.processAssignment(assignment);
// update message id associated with current view of assignments map
lastMessageProduced = messageId;
// update stats
schedulerStats.newAssignment(assignment);
}

log.info("Schedule summary - execution time: {} sec | total unassigned: {} | stats: {}\n{}",
(System.nanoTime() - startTime) / Math.pow(10, 9),
unassignedInstances.getLeft().size(), schedulerStats.getSummary(), schedulerStats);
}

private void invokeRebalance() {
long startTime = System.nanoTime();

Set<String> currentMembership = membershipManager.getCurrentMembership()
.stream().map(workerInfo -> workerInfo.getWorkerId()).collect(Collectors.toSet());

Map<String, Map<String, Assignment>> workerIdToAssignments = functionRuntimeManager.getCurrentAssignments();

// initialize stats collection
SchedulerStats schedulerStats = new SchedulerStats(workerIdToAssignments, currentMembership);

// filter out assignments of workers that are not currently in the active membership
List<Assignment> currentAssignments = workerIdToAssignments
.entrySet()
Expand Down Expand Up @@ -368,8 +390,13 @@ private void invokeRebalance() {
functionRuntimeManager.processAssignment(assignment);
// update message id associated with current view of assignments map
lastMessageProduced = messageId;
// update stats
schedulerStats.newAssignment(assignment);
}
log.info("Rebalance - Total number of new assignments computed: {}", rebalancedAssignments.size());

log.info("Rebalance summary - execution time: {} sec | stats: {}\n{}",
(System.nanoTime() - startTime) / Math.pow(10, 9), schedulerStats.getSummary(), schedulerStats);

rebalanceInProgess.set(false);
}

Expand Down Expand Up @@ -526,4 +553,117 @@ static String checkHeartBeatFunction(Instance funInstance) {

public static class RebalanceInProgressException extends RuntimeException {
}

private static class SchedulerStats {

@Builder
@Data
private static class WorkerStats {
private int originalNumAssignments;
private int finalNumAssignments;
private int instancesAdded;
private int instancesRemoved;
private int instancesUpdated;
private boolean alive;
}

private Map<String, WorkerStats> workerStatsMap = new HashMap<>();

private Map<String, String> instanceToWorkerId = new HashMap<>();

public SchedulerStats(Map<String, Map<String, Assignment>> workerIdToAssignments, Set<String> workers) {

for(String workerId : workers) {
WorkerStats.WorkerStatsBuilder workerStats = WorkerStats.builder().alive(true);
Map<String, Assignment> assignmentMap = workerIdToAssignments.get(workerId);
if (assignmentMap != null) {
workerStats.originalNumAssignments(assignmentMap.size());
workerStats.finalNumAssignments(assignmentMap.size());

for (String fullyQualifiedInstanceId : assignmentMap.keySet()) {
instanceToWorkerId.put(fullyQualifiedInstanceId, workerId);
}
} else {
workerStats.originalNumAssignments(0);
workerStats.finalNumAssignments(0);
}

workerStatsMap.put(workerId, workerStats.build());
}

// workers with assignments that are dead
for (Map.Entry<String, Map<String, Assignment>> entry : workerIdToAssignments.entrySet()) {
String workerId = entry.getKey();
Map<String, Assignment> assignmentMap = entry.getValue();
if (!workers.contains(workerId)) {
WorkerStats workerStats = WorkerStats.builder()
.alive(false)
.originalNumAssignments(assignmentMap.size())
.finalNumAssignments(assignmentMap.size())
.build();
workerStatsMap.put(workerId, workerStats);
}
}
}

public void removedAssignment(Assignment assignment) {
String workerId = assignment.getWorkerId();
WorkerStats stats = workerStatsMap.get(workerId);
Preconditions.checkNotNull(stats);

stats.instancesRemoved++;
stats.finalNumAssignments--;
}

public void newAssignment(Assignment assignment) {
String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
String newWorkerId = assignment.getWorkerId();
String oldWorkerId = instanceToWorkerId.get(fullyQualifiedInstanceId);
if (oldWorkerId != null) {
WorkerStats oldWorkerStats = workerStatsMap.get(oldWorkerId);
Preconditions.checkNotNull(oldWorkerStats);

oldWorkerStats.instancesRemoved++;
oldWorkerStats.finalNumAssignments--;
}

WorkerStats newWorkerStats = workerStatsMap.get(newWorkerId);
Preconditions.checkNotNull(newWorkerStats);

newWorkerStats.instancesAdded++;
newWorkerStats.finalNumAssignments++;
}

public void updatedAssignment(Assignment assignment) {
String workerId = assignment.getWorkerId();
WorkerStats stats = workerStatsMap.get(workerId);
Preconditions.checkNotNull(stats);

stats.instancesUpdated++;
}

public String getSummary() {
int totalAdded = 0;
int totalUpdated = 0;
int totalRemoved = 0;

for (Map.Entry<String, WorkerStats> entry : workerStatsMap.entrySet()) {
WorkerStats workerStats = entry.getValue();
totalAdded += workerStats.instancesAdded;
totalUpdated += workerStats.instancesUpdated;
totalRemoved += workerStats.instancesRemoved;
}

return String.format("{\"Added\": %d, \"Updated\": %d, \"removed\": %d}", totalAdded, totalUpdated, totalRemoved);
}

@Override
public String toString() {
try {
return ObjectMapperFactory.getThreadLocal().writerWithDefaultPrettyPrinter().writeValueAsString(workerStatsMap);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}
}
Loading

0 comments on commit cc2c203

Please sign in to comment.