Skip to content

Commit

Permalink
[Functions] Abstract repetitive code and add unit tests. apache#9502 (a…
Browse files Browse the repository at this point in the history
…pache#9671)

### Motivation
This PR is related to apache#9502 and [PR apache#9519](apache#9519).
In [PR apache#9519 ](apache#9519), another contributer has fixed the problem described in apache#9502, but there are still some areas that can be optimized. 

### Modifications
- Abstract a method and then reuse some code.
- Adding a unit test for the `restartFunctionInstances` method
  • Loading branch information
golden-yang authored Mar 13, 2021
1 parent d2b45b6 commit c7a237a
Show file tree
Hide file tree
Showing 2 changed files with 208 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -421,15 +421,7 @@ public synchronized void restartFunctionInstances(String tenant, String namespac
.entity(new ErrorData(fullFunctionName + " has not been assigned yet")).build());
}

ComponentType componentType = assignment.getInstance().getFunctionMetaData().getFunctionDetails().getComponentType();

if (ComponentType.SOURCE == componentType) {
this.functionAdmin.sources().restartSource(tenant, namespace, functionName);
} else if (ComponentType.SINK == componentType) {
this.functionAdmin.sinks().restartSink(tenant, namespace, functionName);
} else {
this.functionAdmin.functions().restartFunction(tenant, namespace, functionName);
}
restartFunctionUsingPulsarAdmin(assignment, tenant, namespace, functionName, true);
}
} else {
for (Assignment assignment : assignments) {
Expand All @@ -452,25 +444,43 @@ public synchronized void restartFunctionInstances(String tenant, String namespac
}
continue;
}

ComponentType componentType = assignment.getInstance().getFunctionMetaData().getFunctionDetails().getComponentType();

if (ComponentType.SOURCE == componentType) {
this.functionAdmin.sources().restartSource(tenant, namespace, functionName,
assignment.getInstance().getInstanceId());
} else if (ComponentType.SINK == componentType) {
this.functionAdmin.sinks().restartSink(tenant, namespace, functionName,
assignment.getInstance().getInstanceId());
} else {
this.functionAdmin.functions().restartFunction(tenant, namespace, functionName,
assignment.getInstance().getInstanceId());
}
restartFunctionUsingPulsarAdmin(assignment, tenant, namespace, functionName, false);
}
}
}
return;
}

/**
* Restart the entire function or restart a single instance of the function
*/
private void restartFunctionUsingPulsarAdmin(Assignment assignment, String tenant, String namespace,
String functionName, boolean restartEntireFunction) throws PulsarAdminException {
ComponentType componentType = assignment.getInstance().getFunctionMetaData().getFunctionDetails().getComponentType();
if (restartEntireFunction) {
if (ComponentType.SOURCE == componentType) {
this.functionAdmin.sources().restartSource(tenant, namespace, functionName);
} else if (ComponentType.SINK == componentType) {
this.functionAdmin.sinks().restartSink(tenant, namespace, functionName);
} else {
this.functionAdmin.functions().restartFunction(tenant, namespace, functionName);
}
} else {
// only restart single instance
if (ComponentType.SOURCE == componentType) {
this.functionAdmin.sources().restartSource(tenant, namespace, functionName,
assignment.getInstance().getInstanceId());
} else if (ComponentType.SINK == componentType) {
this.functionAdmin.sinks().restartSink(tenant, namespace, functionName,
assignment.getInstance().getInstanceId());
} else {
this.functionAdmin.functions().restartFunction(tenant, namespace, functionName,
assignment.getInstance().getInstanceId());
}
}

}

/**
* It stops all functions instances owned by current worker
* @throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,14 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.ImmutableList;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.Sinks;
import org.apache.pulsar.client.admin.Sources;
import org.apache.pulsar.client.admin.Functions;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
Expand All @@ -51,6 +55,7 @@
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.functions.AuthenticationConfig;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
Expand All @@ -62,6 +67,7 @@
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.mockito.ArgumentMatchers;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
Expand Down Expand Up @@ -1023,4 +1029,174 @@ public void testFunctionRuntimeFactoryConfigsBackwardsCompatibility() throws Exc
ThreadRuntimeFactory threadRuntimeFactory = (ThreadRuntimeFactory) functionRuntimeManager.getRuntimeFactory();
assertEquals(threadRuntimeFactory.getThreadGroup().getName(), "threadGroupName");
}

@Test
public void testThreadFunctionInstancesRestart() throws Exception {

WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getThreadLocal().convertValue(
new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");

PulsarWorkerService workerService = mock(PulsarWorkerService.class);
// mock pulsarAdmin sources sinks functions
PulsarAdmin pulsarAdmin = mock(PulsarAdmin.class);
Sources sources = mock(Sources.class);
doNothing().when(sources).restartSource(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any());
doReturn(sources).when(pulsarAdmin).sources();
Sinks sinks = mock(Sinks.class);
doReturn(sinks).when(pulsarAdmin).sinks();
Functions functions = mock(Functions.class);
doNothing().when(functions).restartFunction(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any());
doReturn(functions).when(pulsarAdmin).functions();

doReturn(pulsarAdmin).when(workerService).getFunctionAdmin();
mockStatic(RuntimeFactory.class);
List<WorkerInfo> workerInfos = new LinkedList<>();
workerInfos.add(WorkerInfo.of("worker-1", "localhost", 0));
workerInfos.add(WorkerInfo.of("worker-2", "localhost", 0));
PowerMockito.when(RuntimeFactory.getFuntionRuntimeFactory(eq(ThreadRuntimeFactory.class.getName())))
.thenReturn(new ThreadRuntimeFactory());
MembershipManager membershipManager = mock(MembershipManager.class);
doReturn(workerInfos).when(membershipManager).getCurrentMembership();

// build three types of FunctionMetaData
Function.FunctionMetaData function = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("function")
.setComponentType(Function.FunctionDetails.ComponentType.FUNCTION)).build();
Function.FunctionMetaData source = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("source")
.setComponentType(Function.FunctionDetails.ComponentType.SOURCE)).build();
Function.FunctionMetaData sink = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("sink")
.setComponentType(Function.FunctionDetails.ComponentType.SINK)).build();

FunctionRuntimeManager functionRuntimeManager = PowerMockito.spy(new FunctionRuntimeManager(
workerConfig,
workerService,
mock(Namespace.class),
membershipManager,
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class)));

// verify restart function/source/sink using different assignment
verifyRestart(functionRuntimeManager, function, "worker-1", false, false);
verifyRestart(functionRuntimeManager, function, "worker-2", false, true);
verifyRestart(functionRuntimeManager, source, "worker-1", false, false);
verifyRestart(functionRuntimeManager, source, "worker-2", false, true);
verifyRestart(functionRuntimeManager, sink, "worker-1", false, false);
verifyRestart(functionRuntimeManager, sink, "worker-2", false, true);
}

@Test
public void testKubernetesFunctionInstancesRestart() throws Exception {

WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
WorkerConfig.KubernetesContainerFactory kubernetesContainerFactory
= new WorkerConfig.KubernetesContainerFactory();
workerConfig.setKubernetesContainerFactory(kubernetesContainerFactory);
KubernetesRuntimeFactory mockedKubernetesRuntimeFactory = spy(new KubernetesRuntimeFactory());
doNothing().when(mockedKubernetesRuntimeFactory).initialize(
any(WorkerConfig.class),
any(AuthenticationConfig.class),
any(SecretsProviderConfigurator.class),
any(),
any(),
any()
);
doNothing().when(mockedKubernetesRuntimeFactory).setupClient();
doReturn(true).when(mockedKubernetesRuntimeFactory).externallyManaged();
PowerMockito.whenNew(KubernetesRuntimeFactory.class)
.withNoArguments().thenReturn(mockedKubernetesRuntimeFactory);

PulsarWorkerService workerService = mock(PulsarWorkerService.class);
// mock pulsarAdmin sources sinks functions
PulsarAdmin pulsarAdmin = mock(PulsarAdmin.class);
Sources sources = mock(Sources.class);
doNothing().when(sources).restartSource(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any());
doReturn(sources).when(pulsarAdmin).sources();
Sinks sinks = mock(Sinks.class);
doReturn(sinks).when(pulsarAdmin).sinks();
Functions functions = mock(Functions.class);
doNothing().when(functions).restartFunction(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any());
doReturn(functions).when(pulsarAdmin).functions();

doReturn(pulsarAdmin).when(workerService).getFunctionAdmin();
mockStatic(RuntimeFactory.class);
List<WorkerInfo> workerInfos = new LinkedList<>();
workerInfos.add(WorkerInfo.of("worker-1", "localhost", 0));
workerInfos.add(WorkerInfo.of("worker-2", "localhost", 0));
PowerMockito.when(RuntimeFactory.getFuntionRuntimeFactory(eq(ThreadRuntimeFactory.class.getName())))
.thenReturn(new ThreadRuntimeFactory());
MembershipManager membershipManager = mock(MembershipManager.class);
doReturn(workerInfos).when(membershipManager).getCurrentMembership();

// build three types of FunctionMetaData
Function.FunctionMetaData function = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("function")
.setComponentType(Function.FunctionDetails.ComponentType.FUNCTION)).build();
Function.FunctionMetaData source = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("source")
.setComponentType(Function.FunctionDetails.ComponentType.SOURCE)).build();
Function.FunctionMetaData sink = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder()
.setTenant("test-tenant").setNamespace("test-namespace").setName("sink")
.setComponentType(Function.FunctionDetails.ComponentType.SINK)).build();

FunctionRuntimeManager functionRuntimeManager = PowerMockito.spy(new FunctionRuntimeManager(
workerConfig,
workerService,
mock(Namespace.class),
membershipManager,
mock(ConnectorsManager.class),
mock(FunctionsManager.class),
mock(FunctionMetaDataManager.class),
mock(WorkerStatsManager.class),
mock(ErrorNotifier.class)));

// verify restart function/source/sink using different assignment
verifyRestart(functionRuntimeManager, function, "worker-1",true, false);
verifyRestart(functionRuntimeManager, function, "worker-2", true, true);
verifyRestart(functionRuntimeManager, source, "worker-1", true, false);
verifyRestart(functionRuntimeManager, source, "worker-2", true, true);
verifyRestart(functionRuntimeManager, sink, "worker-1", true, false);
verifyRestart(functionRuntimeManager, sink, "worker-2", true, true);
}

private static void verifyRestart(FunctionRuntimeManager functionRuntimeManager, Function.FunctionMetaData function,
String workerId, boolean externallyManaged, boolean expectRestartByPulsarAdmin) throws Exception {
Function.Assignment assignment = Function.Assignment.newBuilder()
.setWorkerId(workerId)
.setInstance(Function.Instance.newBuilder()
.setFunctionMetaData(function).setInstanceId(0).build())
.build();
doReturn(ImmutableList.of(assignment)).when(functionRuntimeManager)
.findFunctionAssignments("test-tenant", "test-namespace", "function");
functionRuntimeManager.restartFunctionInstances("test-tenant", "test-namespace", "function");
if (expectRestartByPulsarAdmin) {
PowerMockito.verifyPrivate(functionRuntimeManager)
.invoke("restartFunctionUsingPulsarAdmin", assignment, "test-tenant", "test-namespace", "function", externallyManaged);
} else {
PowerMockito.verifyPrivate(functionRuntimeManager)
.invoke("stopFunction", FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()), true);
}
}

}

0 comments on commit c7a237a

Please sign in to comment.