Skip to content

Commit

Permalink
Allow function rebalance to be run periodically (apache#7449)
Browse files Browse the repository at this point in the history
Co-authored-by: Jerry Peng <[email protected]>
  • Loading branch information
jerrypeng and Jerry Peng authored Jul 7, 2020
1 parent ceecc78 commit 304685b
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 6 deletions.
2 changes: 2 additions & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ schedulerClassName: "org.apache.pulsar.functions.worker.scheduler.RoundRobinSche
functionAssignmentTopicName: "assignments"
failureCheckFreqMs: 30000
rescheduleTimeoutMs: 60000
# frequency at which to check if cluster needs rebalancing (set to -1 to disable)
rebalanceCheckFreqSec: -1
initialBrokerReconnectMaxRetries: 60
assignmentWriteMaxRetries: 60
instanceLivenessCheckFreqMs: 30000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
doc = "The reschedule timeout of function assignment, in milliseconds"
)
private long rescheduleTimeoutMs;
@FieldContext(
category = CATEGORY_FUNC_RUNTIME_MNG,
doc = "The frequency to check whether the cluster needs rebalancing"
)
private long rebalanceCheckFreqSec;
@FieldContext(
category = CATEGORY_FUNC_RUNTIME_MNG,
doc = "The max number of retries for initial broker reconnects when function metadata manager"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ public class SchedulerManager implements AutoCloseable {
private MessageId lastMessageProduced = null;

private MessageId metadataTopicLastMessage = MessageId.earliest;
private Future<?> currentRebalanceFuture;
private AtomicBoolean rebalanceInProgess = new AtomicBoolean(false);

public SchedulerManager(WorkerConfig workerConfig,
PulsarClient pulsarClient,
Expand Down Expand Up @@ -222,9 +224,18 @@ public Future<?> schedule() {
return scheduleInternal(() -> invokeScheduler(), "Encountered error when invoking scheduler");
}

public Future<?> rebalance() {
private Future<?> rebalance() {
return scheduleInternal(() -> invokeRebalance(), "Encountered error when invoking rebalance");
}

public Future<?> rebalanceIfNotInprogress() {
if (rebalanceInProgess.compareAndSet(false, true)) {
currentRebalanceFuture = rebalance();
return currentRebalanceFuture;
} else {
throw new RebalanceInProgressException();
}
}

@VisibleForTesting
void invokeScheduler() {
Expand Down Expand Up @@ -273,7 +284,7 @@ void invokeScheduler() {
MessageId messageId = publishNewAssignment(newAssignment, false);

// Directly update in memory assignment cache since I am leader
log.info("Updating assignment: {}", assignment);
log.info("Updating assignment: {}", newAssignment);
functionRuntimeManager.processAssignment(newAssignment);
// update message id associated with current view of assignments map
lastMessageProduced = messageId;
Expand Down Expand Up @@ -358,7 +369,8 @@ private void invokeRebalance() {
// update message id associated with current view of assignments map
lastMessageProduced = messageId;
}
log.info("Total number of new assignments computed for rebalance: {}", rebalancedAssignments.size());
log.info("Rebalance - Total number of new assignments computed: {}", rebalancedAssignments.size());
rebalanceInProgess.set(false);
}

private void scheduleCompaction(ScheduledExecutorService executor, long scheduleFrequencySec) {
Expand Down Expand Up @@ -511,4 +523,7 @@ static String checkHeartBeatFunction(Instance funInstance) {
}
return null;
}

public static class RebalanceInProgressException extends RuntimeException {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,20 @@ public void start(URI dlogUri,
}
});

if (workerConfig.getRebalanceCheckFreqSec() > 0) {
clusterServiceCoordinator.addTask("rebalance-periodic-check",
workerConfig.getRebalanceCheckFreqSec() * 1000,
() -> {
try {
schedulerManager.rebalanceIfNotInprogress().get();
} catch (SchedulerManager.RebalanceInProgressException e) {
log.info("Scheduled for rebalance but rebalance is already in progress. Ignoring.");
} catch (Exception e) {
log.warn("Encountered error when running scheduled rebalance", e);
}
});
}

log.info("/** Starting Cluster Service Coordinator **/");
clusterServiceCoordinator.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.MembershipManager;
import org.apache.pulsar.functions.worker.SchedulerManager;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.WorkerUtils;

Expand Down Expand Up @@ -217,9 +218,9 @@ public void rebalance(final URI uri, final String clientRole) {
}

if (worker().getLeaderService().isLeader()) {
if (currentRebalanceFuture == null || currentRebalanceFuture.isDone()) {
currentRebalanceFuture = this.worker().getSchedulerManager().rebalance();
} else {
try {
worker().getSchedulerManager().rebalanceIfNotInprogress();
} catch (SchedulerManager.RebalanceInProgressException e) {
throw new RestException(Status.BAD_REQUEST, "Rebalance already in progress");
}
} else {
Expand Down

0 comments on commit 304685b

Please sign in to comment.