Skip to content

Commit

Permalink
[functions][stats] don't generate function stats at worker service i…
Browse files Browse the repository at this point in the history
…f runtime is k8s (apache#2724)

*Motivation*

k8s runtime doesn't support generating function stats at worker service right now.
so skip it for now until that feature is added.

*Changes*

skip function stats for k8s runtime
  • Loading branch information
sijie authored Oct 5, 2018
1 parent af58956 commit ab02f4f
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.functions.worker;

import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.eclipse.jetty.util.ConcurrentHashSet;
Expand All @@ -40,6 +41,11 @@ public class FunctionsStatsGenerator {
public static void generate(WorkerService workerService, String cluster, SimpleTextOutputStream out) {
// only when worker service is initialized, we generate the stats. otherwise we will get bunch of NPE.
if (workerService != null && workerService.isInitialized()) {
// kubernetes runtime factory doesn't support stats collection through worker service
if (workerService.getFunctionRuntimeManager().getRuntimeFactory() instanceof KubernetesRuntimeFactory) {
return;
}

Map<String, FunctionRuntimeInfo> functionRuntimes
= workerService.getFunctionRuntimeManager().getFunctionRuntimeInfos();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.testng.Assert;
Expand Down Expand Up @@ -59,6 +60,20 @@ public void testGenerateFunctionStatsWhenWorkerServiceIsNotInitialized() {
verify(workerService, times(0)).getFunctionRuntimeManager();
}

@Test
public void testGenerateFunctionStatsOnK8SRuntimeFactory() {
WorkerService workerService = mock(WorkerService.class);
when(workerService.isInitialized()).thenReturn(true);
FunctionRuntimeManager frm = mock(FunctionRuntimeManager.class);
when(frm.getRuntimeFactory()).thenReturn(mock(KubernetesRuntimeFactory.class));
when(workerService.getFunctionRuntimeManager()).thenReturn(frm);
FunctionsStatsGenerator.generate(
workerService, "test-cluster", new SimpleTextOutputStream(Unpooled.buffer()));
verify(workerService, times(1)).isInitialized();
verify(workerService, times(1)).getFunctionRuntimeManager();
verify(frm, times(0)).getFunctionRuntimeInfos();
}

@Test
public void testFunctionsStatsGenerate() {
FunctionRuntimeManager functionRuntimeManager = mock(FunctionRuntimeManager.class);
Expand Down

0 comments on commit ab02f4f

Please sign in to comment.