Skip to content

Commit

Permalink
Functions metadata compaction (apache#7377)
Browse files Browse the repository at this point in the history
* Function workers re-direct call update requests to the leader

* Fixed test

* tests pass

* Working version

* Fix test

* Short circuit update

* Fix test

* Fix test

* Fix tests

* Added one more catch

* Added one more catch

* Seperated internal and external errors

* Fix test

* Address feedback

* Do not expose updateOnLeader to functions

* hide api

* hide api

* removed duplicate comments

* Do leadership changes in function metadata manager

* make the function sync

* Added more comments

* Throw error

* Changed name

* address comments

* Deleted unused classes

* Rework metadata manager

* Working

* Fix test

* A better way for test

* Address feedback

* Added an option to compact function metadata topic

* Address feedback

* Incorporate feedback

Co-authored-by: Sanjeev Kulkarni <[email protected]>
  • Loading branch information
srkukarni and Sanjeev Kulkarni authored Jul 1, 2020
1 parent d06a52c commit 3d94553
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
doc = "The pulsar topic used for storing function metadata"
)
private String functionMetadataTopicName;
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "Should the metadata topic be compacted?"
)
private Boolean useCompactedMetadataTopic = false;
@FieldContext(
category = CATEGORY_FUNC_METADATA_MNG,
doc = "The web service url for function workers"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,26 @@ public static String getFullyQualifiedName(String tenant, String namespace, Stri
return String.format("%s/%s/%s", tenant, namespace, functionName);
}

public static String extractTenantFromFullyQualifiedName(String fqfn) {
return extractFromFullyQualifiedName(fqfn, 0);
}

public static String extractNamespaceFromFullyQualifiedName(String fqfn) {
return extractFromFullyQualifiedName(fqfn, 1);
}

public static String extractNameFromFullyQualifiedName(String fqfn) {
return extractFromFullyQualifiedName(fqfn, 2);
}

private static String extractFromFullyQualifiedName(String fqfn, int index) {
String[] parts = fqfn.split("/");
if (parts.length >= 3) {
return parts[index];
}
throw new RuntimeException("Invalid Fully Qualified Function Name " + fqfn);
}

public static Class<?> getTypeArg(String className, Class<?> funClass, ClassLoader classLoader)
throws ClassNotFoundException {
Class<?> loadedClass = classLoader.loadClass(className);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.proto.Request;
import org.apache.pulsar.functions.utils.FunctionCommon;

import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -68,7 +69,10 @@ public class FunctionMetaDataManager implements AutoCloseable {
// Note that this variable serves a double duty. A non-null value
// implies we are the leader, while a null value means we are not the leader
private Producer exclusiveLeaderProducer;
private MessageId lastMessageSeen = MessageId.earliest;
@Getter
private volatile MessageId lastMessageSeen = MessageId.earliest;

private static final String versionTag = "version";

@Getter
private CompletableFuture<Void> isInitialized = new CompletableFuture<>();
Expand Down Expand Up @@ -206,14 +210,30 @@ public synchronized void updateFunctionOnLeader(FunctionMetaData functionMetaDat
} else {
needsScheduling = processUpdate(functionMetaData);
}
Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
.setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
.setFunctionMetaData(functionMetaData)
.setWorkerId(workerConfig.getWorkerId())
.setRequestId(UUID.randomUUID().toString())
.build();
byte[] toWrite;
if (workerConfig.getUseCompactedMetadataTopic()) {
if (delete) {
toWrite = "".getBytes();
} else {
toWrite = functionMetaData.toByteArray();
}
} else {
Request.ServiceRequest serviceRequest = Request.ServiceRequest.newBuilder()
.setServiceRequestType(delete ? Request.ServiceRequest.ServiceRequestType.DELETE : Request.ServiceRequest.ServiceRequestType.UPDATE)
.setFunctionMetaData(functionMetaData)
.setWorkerId(workerConfig.getWorkerId())
.setRequestId(UUID.randomUUID().toString())
.build();
toWrite = serviceRequest.toByteArray();
}
try {
lastMessageSeen = exclusiveLeaderProducer.send(serviceRequest.toByteArray());
TypedMessageBuilder builder = exclusiveLeaderProducer.newMessage()
.value(toWrite)
.property(versionTag, Long.toString(functionMetaData.getVersion()));
if (workerConfig.getUseCompactedMetadataTopic()) {
builder = builder.key(FunctionCommon.getFullyQualifiedName(functionMetaData.getFunctionDetails()));
}
lastMessageSeen = builder.send();
} catch (Exception e) {
log.error("Could not write into Function Metadata topic", e);
throw new IllegalStateException("Internal Error updating function at the leader", e);
Expand Down Expand Up @@ -290,26 +310,48 @@ public synchronized void giveupLeadership() {
*/
public void processMetaDataTopicMessage(Message<byte[]> message) throws IOException {
try {
Request.ServiceRequest serviceRequest = Request.ServiceRequest.parseFrom(message.getData());
if (log.isDebugEnabled()) {
log.debug("Received Service Request: {}", serviceRequest);
}
switch (serviceRequest.getServiceRequestType()) {
case UPDATE:
this.processUpdate(serviceRequest.getFunctionMetaData());
break;
case DELETE:
this.proccessDeregister(serviceRequest.getFunctionMetaData());
break;
default:
log.warn("Received request with unrecognized type: {}", serviceRequest);
if (workerConfig.getUseCompactedMetadataTopic()) {
processCompactedMetaDataTopicMessage(message);
} else {
processUncompactedMetaDataTopicMessage(message);
}
} catch (IllegalArgumentException e) {
// Its ok. Nothing much we can do about it
}
lastMessageSeen = message.getMessageId();
}

private void processUncompactedMetaDataTopicMessage(Message<byte[]> message) throws IOException {
Request.ServiceRequest serviceRequest = Request.ServiceRequest.parseFrom(message.getData());
if (log.isDebugEnabled()) {
log.debug("Received Service Request: {}", serviceRequest);
}
switch (serviceRequest.getServiceRequestType()) {
case UPDATE:
this.processUpdate(serviceRequest.getFunctionMetaData());
break;
case DELETE:
this.proccessDeregister(serviceRequest.getFunctionMetaData());
break;
default:
log.warn("Received request with unrecognized type: {}", serviceRequest);
}
}

private void processCompactedMetaDataTopicMessage(Message<byte[]> message) throws IOException {
long version = Long.valueOf(message.getProperty(versionTag));
String tenant = FunctionCommon.extractTenantFromFullyQualifiedName(message.getKey());
String namespace = FunctionCommon.extractNamespaceFromFullyQualifiedName(message.getKey());
String functionName = FunctionCommon.extractNameFromFullyQualifiedName(message.getKey());
if (message.getData() == null || message.getData().length == 0) {
// this is a delete message
this.proccessDeregister(tenant, namespace, functionName, version);
} else {
FunctionMetaData functionMetaData = FunctionMetaData.parseFrom(message.getData());
this.processUpdate(functionMetaData);
}
}

/**
* Private methods for internal use. Should not be used outside of this class
*/
Expand All @@ -336,25 +378,29 @@ private boolean containsFunctionMetaData(String tenant, String namespace, String

@VisibleForTesting
synchronized boolean proccessDeregister(FunctionMetaData deregisterRequestFs) throws IllegalArgumentException {

String functionName = deregisterRequestFs.getFunctionDetails().getName();
String tenant = deregisterRequestFs.getFunctionDetails().getTenant();
String namespace = deregisterRequestFs.getFunctionDetails().getNamespace();
return proccessDeregister(tenant, namespace, functionName, deregisterRequestFs.getVersion());
}

synchronized boolean proccessDeregister(String tenant, String namespace,
String functionName, long version) throws IllegalArgumentException {

boolean needsScheduling = false;

log.debug("Process deregister request: {}", deregisterRequestFs);
log.debug("Process deregister request: {}/{}/{}/{}", tenant, namespace, functionName, version);

// Check if we still have this function. Maybe already deleted by someone else
if (this.containsFunctionMetaData(deregisterRequestFs)) {
if (this.containsFunctionMetaData(tenant, namespace, functionName)) {
// check if request is outdated
if (!isRequestOutdated(deregisterRequestFs)) {
if (!isRequestOutdated(tenant, namespace, functionName, version)) {
this.functionMetaDataMap.get(tenant).get(namespace).remove(functionName);
needsScheduling = true;
} else {
if (log.isDebugEnabled()) {
log.debug("{}/{}/{} Ignoring outdated request version: {}", tenant, namespace, functionName,
deregisterRequestFs.getVersion());
version);
}
throw new IllegalArgumentException("Delete request ignored because it is out of date. Please try again.");
}
Expand Down Expand Up @@ -393,9 +439,14 @@ synchronized boolean processUpdate(FunctionMetaData updateRequestFs) throws Ille

private boolean isRequestOutdated(FunctionMetaData requestFunctionMetaData) {
Function.FunctionDetails functionDetails = requestFunctionMetaData.getFunctionDetails();
FunctionMetaData currentFunctionMetaData = this.functionMetaDataMap.get(functionDetails.getTenant())
.get(functionDetails.getNamespace()).get(functionDetails.getName());
return currentFunctionMetaData.getVersion() >= requestFunctionMetaData.getVersion();
return isRequestOutdated(functionDetails.getTenant(), functionDetails.getNamespace(),
functionDetails.getName(), requestFunctionMetaData.getVersion());
}

private boolean isRequestOutdated(String tenant, String namespace, String functionName, long version) {
FunctionMetaData currentFunctionMetaData = this.functionMetaDataMap.get(tenant)
.get(namespace).get(functionName);
return currentFunctionMetaData.getVersion() >= version;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.proto.Request.ServiceRequest;

@Slf4j
public class FunctionMetaDataTopicTailer
Expand Down Expand Up @@ -131,11 +130,14 @@ public void close() {

public static Reader createReader(WorkerConfig workerConfig, ReaderBuilder readerBuilder,
MessageId startMessageId) throws PulsarClientException {
return readerBuilder
ReaderBuilder builder = readerBuilder
.topic(workerConfig.getFunctionMetadataTopic())
.startMessageId(startMessageId)
.readerName(workerConfig.getWorkerId() + "-function-metadata-tailer")
.subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-metadata-tailer")
.create();
.subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-metadata-tailer");
if (workerConfig.getUseCompactedMetadataTopic()) {
builder = builder.readCompacted(true);
}
return builder.create();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ public class SchedulerManager implements AutoCloseable {
@Getter
private MessageId lastMessageProduced = null;

private MessageId metadataTopicLastMessage = MessageId.earliest;

public SchedulerManager(WorkerConfig workerConfig,
PulsarClient pulsarClient,
PulsarAdmin admin,
Expand Down Expand Up @@ -224,6 +226,13 @@ private void scheduleCompaction(ScheduledExecutorService executor, long schedule
isCompactionNeeded.set(false);
}
}, scheduleFrequencySec, scheduleFrequencySec, TimeUnit.SECONDS);

executor.scheduleWithFixedDelay(() -> {
if (leaderService.isLeader() && metadataTopicLastMessage.compareTo(functionMetaDataManager.getLastMessageSeen()) != 0) {
metadataTopicLastMessage = functionMetaDataManager.getLastMessageSeen();
compactFunctionMetadataTopic();
}
}, scheduleFrequencySec, scheduleFrequencySec, TimeUnit.SECONDS);
}
}

Expand Down Expand Up @@ -337,6 +346,18 @@ private void compactAssignmentTopic() {
}
}

private void compactFunctionMetadataTopic() {
if (this.admin != null) {
try {
this.admin.topics().triggerCompaction(workerConfig.getFunctionMetadataTopic());
} catch (PulsarAdminException e) {
log.error("Failed to trigger compaction", e);
scheduledExecutorService.schedule(() -> compactFunctionMetadataTopic(), DEFAULT_ADMIN_API_BACKOFF_SEC,
TimeUnit.SECONDS);
}
}
}

private MessageId publishNewAssignment(Assignment assignment, boolean deleted) {
try {
String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.mockito.Mockito.*;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -34,12 +35,25 @@

public class FunctionMetaDataManagerTest {

static byte[] producerByteArray;

private static PulsarClient mockPulsarClient() throws PulsarClientException {
ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
when(builder.topic(anyString())).thenReturn(builder);
when(builder.producerName(anyString())).thenReturn(builder);

when(builder.create()).thenReturn(mock(Producer.class));
Producer producer = mock(Producer.class);
TypedMessageBuilder messageBuilder = mock(TypedMessageBuilder.class);
when(messageBuilder.key(anyString())).thenReturn(messageBuilder);
doAnswer(invocation -> {
Object arg0 = invocation.getArgument(0);
FunctionMetaDataManagerTest.producerByteArray = (byte[])arg0;
return messageBuilder;
}).when(messageBuilder).value(any());
when(messageBuilder.property(anyString(), anyString())).thenReturn(messageBuilder);
when(producer.newMessage()).thenReturn(messageBuilder);

when(builder.create()).thenReturn(producer);

PulsarClient client = mock(PulsarClient.class);
when(client.newProducer()).thenReturn(builder);
Expand Down Expand Up @@ -86,10 +100,20 @@ public void testListFunctions() throws PulsarClientException {
}

@Test
public void testUpdateIfLeaderFunction() throws PulsarClientException {
public void testUpdateIfLeaderFunctionWithoutCompaction() throws PulsarClientException {
testUpdateIfLeaderFunction(false);
}

@Test
public void testUpdateIfLeaderFunctionWithCompaction() throws PulsarClientException {
testUpdateIfLeaderFunction(true);
}

private void testUpdateIfLeaderFunction(boolean compact) throws PulsarClientException {

WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setUseCompactedMetadataTopic(compact);
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
mock(SchedulerManager.class),
Expand All @@ -110,6 +134,11 @@ public void testUpdateIfLeaderFunction() throws PulsarClientException {
functionMetaDataManager.acquireLeadership();
// Now w should be able to really update
functionMetaDataManager.updateFunctionOnLeader(m1, false);
if (compact) {
Assert.assertTrue(Arrays.equals(m1.toByteArray(), producerByteArray));
} else {
Assert.assertFalse(Arrays.equals(m1.toByteArray(), producerByteArray));
}

// outdated request
try {
Expand All @@ -119,15 +148,30 @@ public void testUpdateIfLeaderFunction() throws PulsarClientException {
Assert.assertEquals(e.getMessage(), "Update request ignored because it is out of date. Please try again.");
}
// udpate with new version
m1 = m1.toBuilder().setVersion(2).build();
functionMetaDataManager.updateFunctionOnLeader(m1, false);
Function.FunctionMetaData m2 = m1.toBuilder().setVersion(2).build();
functionMetaDataManager.updateFunctionOnLeader(m2, false);
if (compact) {
Assert.assertTrue(Arrays.equals(m2.toByteArray(), producerByteArray));
} else {
Assert.assertFalse(Arrays.equals(m2.toByteArray(), producerByteArray));
}
}

@Test
public void deregisterFunctionWithoutCompaction() throws PulsarClientException {
deregisterFunction(false);
}

@Test
public void deregisterFunction() throws PulsarClientException {
public void deregisterFunctionWithCompaction() throws PulsarClientException {
deregisterFunction(true);
}

private void deregisterFunction(boolean compact) throws PulsarClientException {
SchedulerManager mockedScheduler = mock(SchedulerManager.class);
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setUseCompactedMetadataTopic(compact);
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
mockedScheduler,
Expand Down Expand Up @@ -170,6 +214,11 @@ public void deregisterFunction() throws PulsarClientException {
m1 = m1.toBuilder().setVersion(2).build();
functionMetaDataManager.updateFunctionOnLeader(m1, true);
verify(mockedScheduler, times(2)).schedule();
if (compact) {
Assert.assertTrue(Arrays.equals("".getBytes(), producerByteArray));
} else {
Assert.assertFalse(Arrays.equals(m1.toByteArray(), producerByteArray));
}
}

@Test
Expand Down

0 comments on commit 3d94553

Please sign in to comment.