diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 045d43ae04ac9..222ebb7996273 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -421,15 +421,7 @@ public synchronized void restartFunctionInstances(String tenant, String namespac .entity(new ErrorData(fullFunctionName + " has not been assigned yet")).build()); } - ComponentType componentType = assignment.getInstance().getFunctionMetaData().getFunctionDetails().getComponentType(); - - if (ComponentType.SOURCE == componentType) { - this.functionAdmin.sources().restartSource(tenant, namespace, functionName); - } else if (ComponentType.SINK == componentType) { - this.functionAdmin.sinks().restartSink(tenant, namespace, functionName); - } else { - this.functionAdmin.functions().restartFunction(tenant, namespace, functionName); - } + restartFunctionUsingPulsarAdmin(assignment, tenant, namespace, functionName, true); } } else { for (Assignment assignment : assignments) { @@ -452,25 +444,43 @@ public synchronized void restartFunctionInstances(String tenant, String namespac } continue; } - - ComponentType componentType = assignment.getInstance().getFunctionMetaData().getFunctionDetails().getComponentType(); - - if (ComponentType.SOURCE == componentType) { - this.functionAdmin.sources().restartSource(tenant, namespace, functionName, - assignment.getInstance().getInstanceId()); - } else if (ComponentType.SINK == componentType) { - this.functionAdmin.sinks().restartSink(tenant, namespace, functionName, - assignment.getInstance().getInstanceId()); - } else { - this.functionAdmin.functions().restartFunction(tenant, namespace, functionName, - assignment.getInstance().getInstanceId()); - } + restartFunctionUsingPulsarAdmin(assignment, tenant, namespace, functionName, false); } } } return; } + /** + * Restart the entire function or restart a single instance of the function + */ + private void restartFunctionUsingPulsarAdmin(Assignment assignment, String tenant, String namespace, + String functionName, boolean restartEntireFunction) throws PulsarAdminException { + ComponentType componentType = assignment.getInstance().getFunctionMetaData().getFunctionDetails().getComponentType(); + if (restartEntireFunction) { + if (ComponentType.SOURCE == componentType) { + this.functionAdmin.sources().restartSource(tenant, namespace, functionName); + } else if (ComponentType.SINK == componentType) { + this.functionAdmin.sinks().restartSink(tenant, namespace, functionName); + } else { + this.functionAdmin.functions().restartFunction(tenant, namespace, functionName); + } + } else { + // only restart single instance + if (ComponentType.SOURCE == componentType) { + this.functionAdmin.sources().restartSource(tenant, namespace, functionName, + assignment.getInstance().getInstanceId()); + } else if (ComponentType.SINK == componentType) { + this.functionAdmin.sinks().restartSink(tenant, namespace, functionName, + assignment.getInstance().getInstanceId()); + } else { + this.functionAdmin.functions().restartFunction(tenant, namespace, functionName, + assignment.getInstance().getInstanceId()); + } + } + + } + /** * It stops all functions instances owned by current worker * @throws Exception diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java index 4214364ce88d9..1739c91d739ee 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java @@ -38,10 +38,14 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.google.common.collect.ImmutableList; import io.netty.buffer.Unpooled; import lombok.extern.slf4j.Slf4j; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.Sinks; +import org.apache.pulsar.client.admin.Sources; +import org.apache.pulsar.client.admin.Functions; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; @@ -51,6 +55,7 @@ import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.functions.AuthenticationConfig; import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.functions.WorkerInfo; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.runtime.RuntimeFactory; @@ -62,6 +67,7 @@ import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig; import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; import org.apache.pulsar.functions.utils.FunctionCommon; +import org.mockito.ArgumentMatchers; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.powermock.api.mockito.PowerMockito; @@ -1023,4 +1029,174 @@ public void testFunctionRuntimeFactoryConfigsBackwardsCompatibility() throws Exc ThreadRuntimeFactory threadRuntimeFactory = (ThreadRuntimeFactory) functionRuntimeManager.getRuntimeFactory(); assertEquals(threadRuntimeFactory.getThreadGroup().getName(), "threadGroupName"); } + + @Test + public void testThreadFunctionInstancesRestart() throws Exception { + + WorkerConfig workerConfig = new WorkerConfig(); + workerConfig.setWorkerId("worker-1"); + workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName()); + workerConfig.setFunctionRuntimeFactoryConfigs( + ObjectMapperFactory.getThreadLocal().convertValue( + new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class)); + workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); + workerConfig.setStateStorageServiceUrl("foo"); + workerConfig.setFunctionAssignmentTopicName("assignments"); + + PulsarWorkerService workerService = mock(PulsarWorkerService.class); + // mock pulsarAdmin sources sinks functions + PulsarAdmin pulsarAdmin = mock(PulsarAdmin.class); + Sources sources = mock(Sources.class); + doNothing().when(sources).restartSource(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any()); + doReturn(sources).when(pulsarAdmin).sources(); + Sinks sinks = mock(Sinks.class); + doReturn(sinks).when(pulsarAdmin).sinks(); + Functions functions = mock(Functions.class); + doNothing().when(functions).restartFunction(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any()); + doReturn(functions).when(pulsarAdmin).functions(); + + doReturn(pulsarAdmin).when(workerService).getFunctionAdmin(); + mockStatic(RuntimeFactory.class); + List workerInfos = new LinkedList<>(); + workerInfos.add(WorkerInfo.of("worker-1", "localhost", 0)); + workerInfos.add(WorkerInfo.of("worker-2", "localhost", 0)); + PowerMockito.when(RuntimeFactory.getFuntionRuntimeFactory(eq(ThreadRuntimeFactory.class.getName()))) + .thenReturn(new ThreadRuntimeFactory()); + MembershipManager membershipManager = mock(MembershipManager.class); + doReturn(workerInfos).when(membershipManager).getCurrentMembership(); + + // build three types of FunctionMetaData + Function.FunctionMetaData function = Function.FunctionMetaData.newBuilder().setFunctionDetails( + Function.FunctionDetails.newBuilder() + .setTenant("test-tenant").setNamespace("test-namespace").setName("function") + .setComponentType(Function.FunctionDetails.ComponentType.FUNCTION)).build(); + Function.FunctionMetaData source = Function.FunctionMetaData.newBuilder().setFunctionDetails( + Function.FunctionDetails.newBuilder() + .setTenant("test-tenant").setNamespace("test-namespace").setName("source") + .setComponentType(Function.FunctionDetails.ComponentType.SOURCE)).build(); + Function.FunctionMetaData sink = Function.FunctionMetaData.newBuilder().setFunctionDetails( + Function.FunctionDetails.newBuilder() + .setTenant("test-tenant").setNamespace("test-namespace").setName("sink") + .setComponentType(Function.FunctionDetails.ComponentType.SINK)).build(); + + FunctionRuntimeManager functionRuntimeManager = PowerMockito.spy(new FunctionRuntimeManager( + workerConfig, + workerService, + mock(Namespace.class), + membershipManager, + mock(ConnectorsManager.class), + mock(FunctionsManager.class), + mock(FunctionMetaDataManager.class), + mock(WorkerStatsManager.class), + mock(ErrorNotifier.class))); + + // verify restart function/source/sink using different assignment + verifyRestart(functionRuntimeManager, function, "worker-1", false, false); + verifyRestart(functionRuntimeManager, function, "worker-2", false, true); + verifyRestart(functionRuntimeManager, source, "worker-1", false, false); + verifyRestart(functionRuntimeManager, source, "worker-2", false, true); + verifyRestart(functionRuntimeManager, sink, "worker-1", false, false); + verifyRestart(functionRuntimeManager, sink, "worker-2", false, true); + } + + @Test + public void testKubernetesFunctionInstancesRestart() throws Exception { + + WorkerConfig workerConfig = new WorkerConfig(); + workerConfig.setWorkerId("worker-1"); + workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); + workerConfig.setStateStorageServiceUrl("foo"); + workerConfig.setFunctionAssignmentTopicName("assignments"); + WorkerConfig.KubernetesContainerFactory kubernetesContainerFactory + = new WorkerConfig.KubernetesContainerFactory(); + workerConfig.setKubernetesContainerFactory(kubernetesContainerFactory); + KubernetesRuntimeFactory mockedKubernetesRuntimeFactory = spy(new KubernetesRuntimeFactory()); + doNothing().when(mockedKubernetesRuntimeFactory).initialize( + any(WorkerConfig.class), + any(AuthenticationConfig.class), + any(SecretsProviderConfigurator.class), + any(), + any(), + any() + ); + doNothing().when(mockedKubernetesRuntimeFactory).setupClient(); + doReturn(true).when(mockedKubernetesRuntimeFactory).externallyManaged(); + PowerMockito.whenNew(KubernetesRuntimeFactory.class) + .withNoArguments().thenReturn(mockedKubernetesRuntimeFactory); + + PulsarWorkerService workerService = mock(PulsarWorkerService.class); + // mock pulsarAdmin sources sinks functions + PulsarAdmin pulsarAdmin = mock(PulsarAdmin.class); + Sources sources = mock(Sources.class); + doNothing().when(sources).restartSource(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any()); + doReturn(sources).when(pulsarAdmin).sources(); + Sinks sinks = mock(Sinks.class); + doReturn(sinks).when(pulsarAdmin).sinks(); + Functions functions = mock(Functions.class); + doNothing().when(functions).restartFunction(ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any()); + doReturn(functions).when(pulsarAdmin).functions(); + + doReturn(pulsarAdmin).when(workerService).getFunctionAdmin(); + mockStatic(RuntimeFactory.class); + List workerInfos = new LinkedList<>(); + workerInfos.add(WorkerInfo.of("worker-1", "localhost", 0)); + workerInfos.add(WorkerInfo.of("worker-2", "localhost", 0)); + PowerMockito.when(RuntimeFactory.getFuntionRuntimeFactory(eq(ThreadRuntimeFactory.class.getName()))) + .thenReturn(new ThreadRuntimeFactory()); + MembershipManager membershipManager = mock(MembershipManager.class); + doReturn(workerInfos).when(membershipManager).getCurrentMembership(); + + // build three types of FunctionMetaData + Function.FunctionMetaData function = Function.FunctionMetaData.newBuilder().setFunctionDetails( + Function.FunctionDetails.newBuilder() + .setTenant("test-tenant").setNamespace("test-namespace").setName("function") + .setComponentType(Function.FunctionDetails.ComponentType.FUNCTION)).build(); + Function.FunctionMetaData source = Function.FunctionMetaData.newBuilder().setFunctionDetails( + Function.FunctionDetails.newBuilder() + .setTenant("test-tenant").setNamespace("test-namespace").setName("source") + .setComponentType(Function.FunctionDetails.ComponentType.SOURCE)).build(); + Function.FunctionMetaData sink = Function.FunctionMetaData.newBuilder().setFunctionDetails( + Function.FunctionDetails.newBuilder() + .setTenant("test-tenant").setNamespace("test-namespace").setName("sink") + .setComponentType(Function.FunctionDetails.ComponentType.SINK)).build(); + + FunctionRuntimeManager functionRuntimeManager = PowerMockito.spy(new FunctionRuntimeManager( + workerConfig, + workerService, + mock(Namespace.class), + membershipManager, + mock(ConnectorsManager.class), + mock(FunctionsManager.class), + mock(FunctionMetaDataManager.class), + mock(WorkerStatsManager.class), + mock(ErrorNotifier.class))); + + // verify restart function/source/sink using different assignment + verifyRestart(functionRuntimeManager, function, "worker-1",true, false); + verifyRestart(functionRuntimeManager, function, "worker-2", true, true); + verifyRestart(functionRuntimeManager, source, "worker-1", true, false); + verifyRestart(functionRuntimeManager, source, "worker-2", true, true); + verifyRestart(functionRuntimeManager, sink, "worker-1", true, false); + verifyRestart(functionRuntimeManager, sink, "worker-2", true, true); + } + + private static void verifyRestart(FunctionRuntimeManager functionRuntimeManager, Function.FunctionMetaData function, + String workerId, boolean externallyManaged, boolean expectRestartByPulsarAdmin) throws Exception { + Function.Assignment assignment = Function.Assignment.newBuilder() + .setWorkerId(workerId) + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function).setInstanceId(0).build()) + .build(); + doReturn(ImmutableList.of(assignment)).when(functionRuntimeManager) + .findFunctionAssignments("test-tenant", "test-namespace", "function"); + functionRuntimeManager.restartFunctionInstances("test-tenant", "test-namespace", "function"); + if (expectRestartByPulsarAdmin) { + PowerMockito.verifyPrivate(functionRuntimeManager) + .invoke("restartFunctionUsingPulsarAdmin", assignment, "test-tenant", "test-namespace", "function", externallyManaged); + } else { + PowerMockito.verifyPrivate(functionRuntimeManager) + .invoke("stopFunction", FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance()), true); + } + } + }