diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java index abe2b387beca4..4ec1ef5cb83a9 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java @@ -199,45 +199,51 @@ public synchronized boolean containsFunction(String tenant, String namespace, St * @throws IllegalStateException if we are not the leader * @throws IllegalArgumentException if the request is out of date. */ - public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete) + public void updateFunctionOnLeader(FunctionMetaData functionMetaData, boolean delete) throws IllegalStateException, IllegalArgumentException { - if (exclusiveLeaderProducer == null) { - throw new IllegalStateException("Not the leader"); - } boolean needsScheduling; - if (delete) { - needsScheduling = proccessDeregister(functionMetaData); - } else { - needsScheduling = processUpdate(functionMetaData); - } - byte[] toWrite; - if (workerConfig.getUseCompactedMetadataTopic()) { + synchronized (this) { + if (exclusiveLeaderProducer == null) { + throw new IllegalStateException("Not the leader"); + } + if (delete) { - toWrite = "".getBytes(); + needsScheduling = proccessDeregister(functionMetaData); } else { - toWrite = functionMetaData.toByteArray(); + needsScheduling = processUpdate(functionMetaData); } - } else { - Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder() - .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE) - .setFunctionMetaData(functionMetaData) - .setWorkerId(workerConfig.getWorkerId()) - .setRequestId(UUID.randomUUID().toString()) - .build(); - toWrite = serviceRequest.toByteArray(); - } - try { - TypedMessageBuilder builder = exclusiveLeaderProducer.newMessage() - .value(toWrite) - .property(versionTag, Long.toString(functionMetaData.getVersion())); + byte[] toWrite; if (workerConfig.getUseCompactedMetadataTopic()) { - builder = builder.key(FunctionCommon.getFullyQualifiedName(functionMetaData.getFunctionDetails())); + if (delete) { + toWrite = "".getBytes(); + } else { + toWrite = functionMetaData.toByteArray(); + } + } else { + Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder() + .setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE + : Request.ServiceRequest.ServiceRequestType.UPDATE) + .setFunctionMetaData(functionMetaData) + .setWorkerId(workerConfig.getWorkerId()) + .setRequestId(UUID.randomUUID().toString()) + .build(); + toWrite = serviceRequest.toByteArray(); } - lastMessageSeen = builder.send(); - } catch (Exception e) { - log.error("Could not write into Function Metadata topic", e); - throw new IllegalStateException("Internal Error updating function at the leader", e); + try { + TypedMessageBuilder builder = exclusiveLeaderProducer.newMessage() + .value(toWrite) + .property(versionTag, Long.toString(functionMetaData.getVersion())); + if (workerConfig.getUseCompactedMetadataTopic()) { + builder = builder.key(FunctionCommon.getFullyQualifiedName(functionMetaData.getFunctionDetails())); + } + lastMessageSeen = builder.send(); + } catch (Exception e) { + log.error("Could not write into Function Metadata topic", e); + throw new IllegalStateException("Internal Error updating function at the leader", e); + } + } + if (needsScheduling) { this.schedulerManager.schedule(); }