Skip to content

Commit

Permalink
Have metadata tailer use its own thread for processing (apache#7211)
Browse files Browse the repository at this point in the history
* Have metadata tailer use its own thread for processing

* Merged with master

* Address comments

* Address comments

Co-authored-by: Sanjeev Kulkarni <[email protected]>
  • Loading branch information
srkukarni and Sanjeev Kulkarni authored Jun 9, 2020
1 parent 5d78a2d commit e64d951
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class FunctionMetaDataManager implements AutoCloseable {
private final SchedulerManager schedulerManager;
private final WorkerConfig workerConfig;
private final PulsarClient pulsarClient;
private final ErrorNotifier errorNotifier;

private FunctionMetaDataTopicTailer functionMetaDataTopicTailer;

Expand All @@ -69,12 +70,14 @@ public class FunctionMetaDataManager implements AutoCloseable {

public FunctionMetaDataManager(WorkerConfig workerConfig,
SchedulerManager schedulerManager,
PulsarClient pulsarClient) throws PulsarClientException {
PulsarClient pulsarClient,
ErrorNotifier errorNotifier) throws PulsarClientException {
this.workerConfig = workerConfig;
this.pulsarClient = pulsarClient;
this.serviceRequestManager = getServiceRequestManager(
this.pulsarClient, this.workerConfig.getFunctionMetadataTopic());
this.schedulerManager = schedulerManager;
this.errorNotifier = errorNotifier;
}

/**
Expand All @@ -88,17 +91,12 @@ public FunctionMetaDataManager(WorkerConfig workerConfig,
public void initialize() {
log.info("/** Initializing Function Metadata Manager **/");
try {
Reader<byte[]> reader = pulsarClient.newReader()
.topic(this.workerConfig.getFunctionMetadataTopic())
.startMessageId(MessageId.earliest)
.readerName(workerConfig.getWorkerId() + "-function-metadata-manager")
.create();

this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this, reader);
this.functionMetaDataTopicTailer = new FunctionMetaDataTopicTailer(this,
pulsarClient.newReader(), this.workerConfig, this.errorNotifier);
// read all existing messages
this.setInitializePhase(true);
while (reader.hasMessageAvailable()) {
this.functionMetaDataTopicTailer.processRequest(reader.readNext());
while (this.functionMetaDataTopicTailer.getReader().hasMessageAvailable()) {
this.functionMetaDataTopicTailer.processRequest(this.functionMetaDataTopicTailer.getReader().readNext());
}
this.setInitializePhase(false);
// schedule functions if necessary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,78 +19,92 @@
package org.apache.pulsar.functions.worker;

import java.io.IOException;
import java.util.function.Function;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.proto.Request.ServiceRequest;

@Slf4j
public class FunctionMetaDataTopicTailer
implements java.util.function.Consumer<Message<byte[]>>, Function<Throwable, Void>, AutoCloseable {
implements Runnable, AutoCloseable {

private final FunctionMetaDataManager functionMetaDataManager;
@Getter
private final Reader<byte[]> reader;
private final Thread readerThread;
private volatile boolean running;
private ErrorNotifier errorNotifier;

public FunctionMetaDataTopicTailer(FunctionMetaDataManager functionMetaDataManager,
Reader<byte[]> reader)
ReaderBuilder readerBuilder, WorkerConfig workerConfig,
ErrorNotifier errorNotifier)
throws PulsarClientException {
this.functionMetaDataManager = functionMetaDataManager;
this.reader = reader;
this.reader = readerBuilder
.topic(workerConfig.getFunctionMetadataTopic())
.startMessageId(MessageId.earliest)
.readerName(workerConfig.getWorkerId() + "-function-metadata-manager")
.subscriptionRolePrefix(workerConfig.getWorkerId() + "-function-metadata-manager")
.create();
readerThread = new Thread(this);
readerThread.setName("function-metadata-tailer-thread");
this.errorNotifier = errorNotifier;
}

public void start() {
receiveOne();
running = true;
readerThread.start();
}

private void receiveOne() {
reader.readNextAsync()
.thenAccept(this)
.exceptionally(this);
@Override
public void run() {
while(running) {
try {
Message<byte[]> msg = reader.readNext();
processRequest(msg);
} catch (Throwable th) {
if (running) {
log.error("Encountered error in metadata tailer", th);
// trigger fatal error
running = false;
errorNotifier.triggerError(th);
} else {
if (!(th instanceof InterruptedException || th.getCause() instanceof InterruptedException)) {
log.warn("Encountered error when metadata tailer is not running", th);
}
return;
}
}
}
}

@Override
public void close() {
log.info("Stopping function metadata tailer");
try {
reader.close();
running = false;
if (readerThread != null && readerThread.isAlive()) {
readerThread.interrupt();
}
if (reader != null) {
reader.close();
}
} catch (IOException e) {
log.error("Failed to stop function metadata tailer", e);
}
log.info("Stopped function function metadata tailer");
log.info("Stopped function metadata tailer");
}

public void processRequest(Message<byte[]> msg) {
ServiceRequest serviceRequest;

try {
serviceRequest = ServiceRequest.parseFrom(msg.getData());
} catch (IOException e) {
log.error("Received bad service request at message {}", msg.getMessageId(), e);
// TODO: find a better way to handle bad request
throw new RuntimeException(e);
}
public void processRequest(Message<byte[]> msg) throws IOException {
ServiceRequest serviceRequest = ServiceRequest.parseFrom(msg.getData());
if (log.isDebugEnabled()) {
log.debug("Received Service Request: {}", serviceRequest);
}

this.functionMetaDataManager.processRequest(msg.getMessageId(), serviceRequest);
}

@Override
public void accept(Message<byte[]> msg) {

processRequest(msg);
// receive next request
receiveOne();
}

@Override
public Void apply(Throwable cause) {
log.error("Failed to retrieve messages from function state topic", cause);
// TODO: find a better way to handle consumer functions
throw new RuntimeException(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void start(URI dlogUri,

//create function meta data manager
this.functionMetaDataManager = new FunctionMetaDataManager(
this.workerConfig, this.schedulerManager, this.client);
this.workerConfig, this.schedulerManager, this.client, errorNotifier);

this.connectorsManager = new ConnectorsManager(workerConfig);
this.functionsManager = new FunctionsManager(workerConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void testListFunctions() throws PulsarClientException {
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(new WorkerConfig(),
mock(SchedulerManager.class),
mockPulsarClient()));
mockPulsarClient(), ErrorNotifier.getDefaultImpl()));

Map<String, Function.FunctionMetaData> functionMetaDataMap1 = new HashMap<>();
Function.FunctionMetaData f1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Expand Down Expand Up @@ -104,7 +104,7 @@ public void updateFunction() throws PulsarClientException {
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
mock(SchedulerManager.class),
mockPulsarClient()));
mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")).build();

Expand Down Expand Up @@ -135,7 +135,7 @@ public boolean matches(Request.ServiceRequest serviceRequest) {
functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
mock(SchedulerManager.class),
mockPulsarClient()));
mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
Map<String, Function.FunctionMetaData> functionMetaDataMap = new HashMap<>();
Function.FunctionMetaData m2 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
Expand Down Expand Up @@ -177,7 +177,7 @@ public void testStopFunction() throws PulsarClientException {
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
mock(SchedulerManager.class),
mockPulsarClient()));
mockPulsarClient(), ErrorNotifier.getDefaultImpl()));

Map<String, Function.FunctionMetaData> functionMetaDataMap1 = new HashMap<>();
Function.FunctionMetaData f1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Expand Down Expand Up @@ -232,7 +232,7 @@ public void deregisterFunction() throws PulsarClientException {
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
mock(SchedulerManager.class),
mockPulsarClient()));
mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
.setNamespace("namespace-1").setTenant("tenant-1")).setVersion(version).build();
Expand Down Expand Up @@ -271,7 +271,7 @@ public void testProcessRequest() throws PulsarClientException {
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
mock(SchedulerManager.class),
mockPulsarClient()));
mockPulsarClient(), ErrorNotifier.getDefaultImpl()));

Mockito.doNothing().when(functionMetaDataManager).processUpdate(any(Request.ServiceRequest.class));
Mockito.doNothing().when(functionMetaDataManager).proccessDeregister(any(Request.ServiceRequest.class));
Expand Down Expand Up @@ -309,7 +309,7 @@ public void processUpdateTest() throws PulsarClientException {
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
schedulerManager,
mockPulsarClient()));
mockPulsarClient(), ErrorNotifier.getDefaultImpl()));

// worker has no record of function
Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder()
Expand Down Expand Up @@ -337,7 +337,7 @@ public void processUpdateTest() throws PulsarClientException {
functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
schedulerManager,
mockPulsarClient()));
mockPulsarClient(), ErrorNotifier.getDefaultImpl()));

Function.FunctionMetaData m3 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
Expand Down Expand Up @@ -386,7 +386,7 @@ public void processUpdateTest() throws PulsarClientException {
functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
schedulerManager,
mockPulsarClient()));
mockPulsarClient(), ErrorNotifier.getDefaultImpl()));

Function.FunctionMetaData m4 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
Expand Down Expand Up @@ -423,7 +423,7 @@ public void processDeregister() throws PulsarClientException {
FunctionMetaDataManager functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
schedulerManager,
mockPulsarClient()));
mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
// worker has no record of function
Function.FunctionMetaData test = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-2")
Expand All @@ -450,7 +450,7 @@ public void processDeregister() throws PulsarClientException {
functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
schedulerManager,
mockPulsarClient()));
mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
functionMetaDataManager.setFunctionMetaData(test);
Function.FunctionMetaData m2 = Function.FunctionMetaData.newBuilder()
.setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")
Expand All @@ -477,7 +477,7 @@ public void processDeregister() throws PulsarClientException {
functionMetaDataManager = spy(
new FunctionMetaDataManager(workerConfig,
schedulerManager,
mockPulsarClient()));
mockPulsarClient(), ErrorNotifier.getDefaultImpl()));
functionMetaDataManager.setFunctionMetaData(test);

Function.FunctionMetaData m3 = Function.FunctionMetaData.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,26 @@
*/
package org.apache.pulsar.functions.worker;

import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.functions.proto.Request.ServiceRequest;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.worker.request.ServiceRequestUtils;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

Expand All @@ -45,13 +50,20 @@ public class FunctionMetaDataTopicTailerTest {
private static final String TEST_NAME = "test-fmt";

private final Reader reader;
private final ReaderBuilder readerBuilder;
private final FunctionMetaDataManager fsm;
private final FunctionMetaDataTopicTailer fsc;

public FunctionMetaDataTopicTailerTest() throws Exception {
this.reader = mock(Reader.class);
this.readerBuilder = mock(ReaderBuilder.class);
when(readerBuilder.topic(anyString())).thenReturn(readerBuilder);
when(readerBuilder.startMessageId(any(MessageId.class))).thenReturn(readerBuilder);
when(readerBuilder.readerName(anyString())).thenReturn(readerBuilder);
when(readerBuilder.subscriptionRolePrefix(anyString())).thenReturn(readerBuilder);
when(readerBuilder.create()).thenReturn(reader);
this.fsm = mock(FunctionMetaDataManager.class);
this.fsc = new FunctionMetaDataTopicTailer(fsm, reader);
this.fsc = new FunctionMetaDataTopicTailer(fsm, readerBuilder, new WorkerConfig(), ErrorNotifier.getDefaultImpl() );
}

@AfterMethod
Expand All @@ -67,18 +79,25 @@ public void testUpdate() throws Exception {

Message msg = mock(Message.class);
when(msg.getData()).thenReturn(request.toByteArray());

CompletableFuture<Message> receiveFuture = CompletableFuture.completedFuture(msg);
when(reader.readNextAsync())
.thenReturn(receiveFuture)
.thenReturn(new CompletableFuture<>());
CountDownLatch readLatch = new CountDownLatch(1);
CountDownLatch processLatch = new CountDownLatch(1);
when(reader.readNext()).thenReturn(msg).then(new Answer<Message>() {
public Message answer(InvocationOnMock invocation) {
try {
readLatch.countDown();
processLatch.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
return null;
}
});

fsc.start();

// wait for receive future to complete
receiveFuture.thenApply(Function.identity()).get();
readLatch.await();

verify(reader, times(2)).readNextAsync();
verify(reader, times(2)).readNext();
verify(fsm, times(1)).processRequest(any(), any(ServiceRequest.class));
}
}

0 comments on commit e64d951

Please sign in to comment.