From 03df4404f2794f57b362fb076c9c102f98b25fa3 Mon Sep 17 00:00:00 2001 From: Boyang Jerry Peng Date: Fri, 2 Mar 2018 00:17:47 -0800 Subject: [PATCH] adding SchedulerManager tests (#276) --- .../functions/worker/SchedulerManager.java | 8 +- .../worker/SchedulerManagerTest.java | 703 ++++++++++++++++++ 2 files changed, 708 insertions(+), 3 deletions(-) create mode 100644 pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java index 640901dcd3736..de0228dc544a3 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java @@ -41,6 +41,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -87,11 +88,11 @@ public SchedulerManager(WorkerConfig workerConfig, PulsarClient pulsarClient) { this.executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue()); + new LinkedBlockingQueue<>()); } - public void schedule() { - executorService.submit(() -> { + public Future schedule() { + return executorService.submit(() -> { synchronized (SchedulerManager.this) { boolean isLeader = membershipManager.isLeader(); if (isLeader) { @@ -164,6 +165,7 @@ private void invokeScheduler() { // wait for assignment update to go throw the pipeline while (this.functionRuntimeManager.getCurrentAssignmentVersion() < assignmentVersion) { + log.info("Waiting for assignments to propagate..."); try { Thread.sleep(500); } catch (InterruptedException e) { diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java new file mode 100644 index 0000000000000..f9a03dbcd8475 --- /dev/null +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java @@ -0,0 +1,703 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker; + +import com.google.protobuf.InvalidProtocolBufferException; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.functions.proto.Function; +import org.apache.pulsar.functions.proto.Request; +import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler; +import org.mockito.Mockito; +import org.mockito.invocation.Invocation; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@Slf4j +public class SchedulerManagerTest { + + private SchedulerManager schedulerManager; + private FunctionMetaDataManager functionMetaDataManager; + private FunctionRuntimeManager functionRuntimeManager; + private MembershipManager membershipManager; + private CompletableFuture completableFuture; + private Producer producer; + + @BeforeMethod + public void setup() throws PulsarClientException { + WorkerConfig workerConfig = new WorkerConfig(); + workerConfig.setWorkerId("worker-1"); + workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test")); + workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); + workerConfig.setStateStorageServiceUrl("foo"); + workerConfig.setFunctionAssignmentTopicName("assignments"); + workerConfig.setMetricsConfig(new WorkerConfig.MetricsConfig() + .setMetricsSinkClassName(FunctionRuntimeManagerTest.TestSink.class.getName())); + workerConfig.setSchedulerClassName(RoundRobinScheduler.class.getName()); + + producer = mock(Producer.class); + completableFuture = spy(new CompletableFuture<>()); + completableFuture.complete(MessageId.earliest); + byte[] bytes = any(); + when(producer.sendAsync(bytes)).thenReturn(completableFuture); + + PulsarClient pulsarClient = mock(PulsarClient.class); + doReturn(producer).when(pulsarClient).createProducer(any(), any()); + + schedulerManager = spy(new SchedulerManager(workerConfig, pulsarClient)); + functionRuntimeManager = mock(FunctionRuntimeManager.class); + functionMetaDataManager = mock(FunctionMetaDataManager.class); + membershipManager = mock(MembershipManager.class); + schedulerManager.setFunctionMetaDataManager(functionMetaDataManager); + schedulerManager.setFunctionRuntimeManager(functionRuntimeManager); + schedulerManager.setMembershipManager(membershipManager); + } + + @Test + public void testSchedule() throws PulsarClientException, NoSuchMethodException, InterruptedException { + + List functionMetaDataList = new LinkedList<>(); + long version = 5; + Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder() + .setFunctionConfig(Function.FunctionConfig.newBuilder().setName("func-1") + .setNamespace("namespace-1").setTenant("tenant-1").setParallelism(1)).setVersion(version) + .build(); + functionMetaDataList.add(function1); + doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); + + // set assignments + Function.Assignment assignment1 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function1).setInstanceId(0).build()) + .build(); + + Map> currentAssignments = new HashMap<>(); + Map assignmentEntry1 = new HashMap<>(); + assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); + currentAssignments.put("worker-1", assignmentEntry1); + doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); + + //set version + doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); + + // single node + List workerInfoList = new LinkedList<>(); + workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", "workerHostname-1", 5000)); + doReturn(workerInfoList).when(membershipManager).getCurrentMembership(); + + // i am not leader + doReturn(false).when(membershipManager).isLeader(); + callSchedule(); + verify(producer, times(0)).sendAsync(any()); + + // i am leader + doReturn(true).when(membershipManager).isLeader(); + callSchedule(); + verify(producer, times(1)).sendAsync(any(byte[].class)); + } + + @Test + public void testNothingNewToSchedule() throws InterruptedException, ExecutionException, NoSuchMethodException, + InvalidProtocolBufferException { + + List functionMetaDataList = new LinkedList<>(); + long version = 5; + Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder() + .setFunctionConfig(Function.FunctionConfig.newBuilder().setName("func-1") + .setNamespace("namespace-1").setTenant("tenant-1").setParallelism(1)).setVersion(version) + .build(); + functionMetaDataList.add(function1); + doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); + + // set assignments + Function.Assignment assignment1 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function1).setInstanceId(0).build()) + .build(); + + Map> currentAssignments = new HashMap<>(); + Map assignmentEntry1 = new HashMap<>(); + assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); + currentAssignments.put("worker-1", assignmentEntry1); + doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); + + //set version + doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); + + // single node + List workerInfoList = new LinkedList<>(); + workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", "workerHostname-1", 5000)); + doReturn(workerInfoList).when(membershipManager).getCurrentMembership(); + + // i am leader + doReturn(true).when(membershipManager).isLeader(); + + callSchedule(); + + List invocations = getMethodInvocationDetails(producer, Producer.class.getMethod("sendAsync", + Object.class)); + Assert.assertEquals(invocations.size(), 1); + + byte[] send = (byte[]) invocations.get(0).getRawArguments()[0]; + Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); + log.info("assignmentsUpdate: {}", assignmentsUpdate); + Assert.assertEquals( + Request.AssignmentsUpdate.newBuilder().setVersion(version + 1) + .addAssignments(assignment1).build(), + assignmentsUpdate); + } + + @Test + public void testAddingFunctions() throws NoSuchMethodException, InterruptedException, + InvalidProtocolBufferException { + List functionMetaDataList = new LinkedList<>(); + long version = 5; + Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder() + .setFunctionConfig(Function.FunctionConfig.newBuilder().setName("func-1") + .setNamespace("namespace-1").setTenant("tenant-1").setParallelism(1)).setVersion(version) + .build(); + + Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder() + .setFunctionConfig(Function.FunctionConfig.newBuilder().setName("func-2") + .setNamespace("namespace-1").setTenant("tenant-1").setParallelism(1)).setVersion(version) + .build(); + functionMetaDataList.add(function1); + functionMetaDataList.add(function2); + doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); + + // set assignments + Function.Assignment assignment1 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function1).setInstanceId(0).build()) + .build(); + + Map> currentAssignments = new HashMap<>(); + Map assignmentEntry1 = new HashMap<>(); + assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); + currentAssignments.put("worker-1", assignmentEntry1); + doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); + + //set version + doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); + + // single node + List workerInfoList = new LinkedList<>(); + workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", "workerHostname-1", 5000)); + doReturn(workerInfoList).when(membershipManager).getCurrentMembership(); + + // i am leader + doReturn(true).when(membershipManager).isLeader(); + + callSchedule(); + + List invocations = getMethodInvocationDetails(producer, Producer.class.getMethod("sendAsync", + Object.class)); + Assert.assertEquals(invocations.size(), 1); + + byte[] send = (byte[]) invocations.get(0).getRawArguments()[0]; + Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); + + log.info("assignmentsUpdate: {}", assignmentsUpdate); + Function.Assignment assignment2 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function2).setInstanceId(0).build()) + .build(); + Assert.assertEquals( + Request.AssignmentsUpdate.newBuilder().setVersion(version + 1) + .addAssignments(assignment1).addAssignments(assignment2).build(), + assignmentsUpdate); + + } + + @Test + public void testDeletingFunctions() throws NoSuchMethodException, InterruptedException, + InvalidProtocolBufferException { + List functionMetaDataList = new LinkedList<>(); + long version = 5; + Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder() + .setFunctionConfig(Function.FunctionConfig.newBuilder().setName("func-1") + .setNamespace("namespace-1").setTenant("tenant-1").setParallelism(1)).setVersion(version) + .build(); + + // simulate function2 got removed + Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder() + .setFunctionConfig(Function.FunctionConfig.newBuilder().setName("func-2") + .setNamespace("namespace-1").setTenant("tenant-1").setParallelism(1)).setVersion(version) + .build(); + functionMetaDataList.add(function1); + doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); + + // set assignments + Function.Assignment assignment1 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function1).setInstanceId(0).build()) + .build(); + + Function.Assignment assignment2 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function2).setInstanceId(0).build()) + .build(); + + Map> currentAssignments = new HashMap<>(); + Map assignmentEntry1 = new HashMap<>(); + assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); + assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment2.getInstance()), assignment2); + + currentAssignments.put("worker-1", assignmentEntry1); + doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); + + //set version + doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); + + // single node + List workerInfoList = new LinkedList<>(); + workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", "workerHostname-1", 5000)); + doReturn(workerInfoList).when(membershipManager).getCurrentMembership(); + + // i am leader + doReturn(true).when(membershipManager).isLeader(); + + callSchedule(); + + List invocations = getMethodInvocationDetails(producer, Producer.class.getMethod("sendAsync", + Object.class)); + Assert.assertEquals(invocations.size(), 1); + + byte[] send = (byte[]) invocations.get(0).getRawArguments()[0]; + Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); + + log.info("assignmentsUpdate: {}", assignmentsUpdate); + + Assert.assertEquals( + Request.AssignmentsUpdate.newBuilder().setVersion(version + 1) + .addAssignments(assignment1).build(), + assignmentsUpdate); + } + + @Test + public void testScalingUp() throws NoSuchMethodException, InterruptedException, InvalidProtocolBufferException, PulsarClientException { + List functionMetaDataList = new LinkedList<>(); + long version = 5; + Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder() + .setFunctionConfig(Function.FunctionConfig.newBuilder().setName("func-1") + .setNamespace("namespace-1").setTenant("tenant-1").setParallelism(1)).setVersion(version) + .build(); + + Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder() + .setFunctionConfig(Function.FunctionConfig.newBuilder().setName("func-2") + .setNamespace("namespace-1").setTenant("tenant-1").setParallelism(1)).setVersion(version) + .build(); + functionMetaDataList.add(function1); + functionMetaDataList.add(function2); + doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); + + // set assignments + Function.Assignment assignment1 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function1).setInstanceId(0).build()) + .build(); + + Map> currentAssignments = new HashMap<>(); + Map assignmentEntry1 = new HashMap<>(); + assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); + + currentAssignments.put("worker-1", assignmentEntry1); + doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); + + //set version + doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); + + // single node + List workerInfoList = new LinkedList<>(); + workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", "workerHostname-1", 5000)); + doReturn(workerInfoList).when(membershipManager).getCurrentMembership(); + + // i am leader + doReturn(true).when(membershipManager).isLeader(); + + callSchedule(); + + List invocations = getMethodInvocationDetails(producer, Producer.class.getMethod("sendAsync", + Object.class)); + Assert.assertEquals(invocations.size(), 1); + + byte[] send = (byte[]) invocations.get(0).getRawArguments()[0]; + Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); + + log.info("assignmentsUpdate: {}", assignmentsUpdate); + + Function.Assignment assignment2 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function2).setInstanceId(0).build()) + .build(); + Assert.assertEquals( + assignmentsUpdate, + Request.AssignmentsUpdate.newBuilder().setVersion(version + 1) + .addAssignments(assignment1).addAssignments(assignment2).build() + ); + + // scale up + + PulsarClient pulsarClient = mock(PulsarClient.class); + doReturn(producer).when(pulsarClient).createProducer(any(), any()); + + Function.FunctionMetaData function2Scaled = Function.FunctionMetaData.newBuilder() + .setFunctionConfig(Function.FunctionConfig.newBuilder().setName("func-2") + .setNamespace("namespace-1").setTenant("tenant-1").setParallelism(3)).setVersion(version) + .build(); + functionMetaDataList = new LinkedList<>(); + functionMetaDataList.add(function1); + functionMetaDataList.add(function2Scaled); + doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); + + Function.Assignment assignment2Scaled1 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function2Scaled).setInstanceId(0).build()) + .build(); + Function.Assignment assignment2Scaled2 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function2Scaled).setInstanceId(1).build()) + .build(); + Function.Assignment assignment2Scaled3 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function2Scaled).setInstanceId(2).build()) + .build(); + + callSchedule(); + + invocations = getMethodInvocationDetails(producer, Producer.class.getMethod("sendAsync", + Object.class)); + Assert.assertEquals(invocations.size(), 2); + + send = (byte[]) invocations.get(1).getRawArguments()[0]; + assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); + Assert.assertEquals(assignmentsUpdate, + Request.AssignmentsUpdate.newBuilder().setVersion(version + 1 + 1) + .addAssignments(assignment1).addAssignments(assignment2Scaled1) + .addAssignments(assignment2Scaled2).addAssignments(assignment2Scaled3).build() + ); + } + + @Test + public void testScalingDown() throws PulsarClientException, NoSuchMethodException, InterruptedException, + InvalidProtocolBufferException { + List functionMetaDataList = new LinkedList<>(); + long version = 5; + Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder() + .setFunctionConfig(Function.FunctionConfig.newBuilder().setName("func-1") + .setNamespace("namespace-1").setTenant("tenant-1").setParallelism(1)).setVersion(version) + .build(); + + Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder() + .setFunctionConfig(Function.FunctionConfig.newBuilder().setName("func-2") + .setNamespace("namespace-1").setTenant("tenant-1").setParallelism(3)).setVersion(version) + .build(); + functionMetaDataList.add(function1); + functionMetaDataList.add(function2); + doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); + + // set assignments + Function.Assignment assignment1 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function1).setInstanceId(0).build()) + .build(); + + Map> currentAssignments = new HashMap<>(); + Map assignmentEntry1 = new HashMap<>(); + assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); + + currentAssignments.put("worker-1", assignmentEntry1); + doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); + + //set version + doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); + + // single node + List workerInfoList = new LinkedList<>(); + workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", "workerHostname-1", 5000)); + doReturn(workerInfoList).when(membershipManager).getCurrentMembership(); + + // i am leader + doReturn(true).when(membershipManager).isLeader(); + + callSchedule(); + + List invocations = getMethodInvocationDetails(producer, Producer.class.getMethod("sendAsync", + Object.class)); + Assert.assertEquals(invocations.size(), 1); + + byte[] send = (byte[]) invocations.get(0).getRawArguments()[0]; + Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); + + log.info("assignmentsUpdate: {}", assignmentsUpdate); + + Function.Assignment assignment2_1 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function2).setInstanceId(0).build()) + .build(); + Function.Assignment assignment2_2 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function2).setInstanceId(1).build()) + .build(); + Function.Assignment assignment2_3 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function2).setInstanceId(2).build()) + .build(); + Assert.assertEquals( + assignmentsUpdate, + Request.AssignmentsUpdate.newBuilder().setVersion(version + 1) + .addAssignments(assignment1).addAssignments(assignment2_1) + .addAssignments(assignment2_2).addAssignments(assignment2_3).build() + ); + + // scale down + + PulsarClient pulsarClient = mock(PulsarClient.class); + doReturn(producer).when(pulsarClient).createProducer(any(), any()); + + Function.FunctionMetaData function2Scaled = Function.FunctionMetaData.newBuilder() + .setFunctionConfig(Function.FunctionConfig.newBuilder().setName("func-2") + .setNamespace("namespace-1").setTenant("tenant-1").setParallelism(1)).setVersion(version) + .build(); + functionMetaDataList = new LinkedList<>(); + functionMetaDataList.add(function1); + functionMetaDataList.add(function2Scaled); + doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); + + Function.Assignment assignment2Scaled = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function2Scaled).setInstanceId(0).build()) + .build(); + + callSchedule(); + + invocations = getMethodInvocationDetails(producer, Producer.class.getMethod("sendAsync", + Object.class)); + Assert.assertEquals(invocations.size(), 2); + + send = (byte[]) invocations.get(1).getRawArguments()[0]; + assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); + Assert.assertEquals(assignmentsUpdate, + Request.AssignmentsUpdate.newBuilder().setVersion(version + 1 + 1) + .addAssignments(assignment1).addAssignments(assignment2Scaled) + .build() + ); + } + + @Test + public void testUpdate() throws PulsarClientException, NoSuchMethodException, InterruptedException, + InvalidProtocolBufferException { + List functionMetaDataList = new LinkedList<>(); + long version = 5; + Function.FunctionMetaData function1 = Function.FunctionMetaData.newBuilder() + .setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath("/foo/bar1")) + .setFunctionConfig(Function.FunctionConfig.newBuilder().setName("func-1") + .setNamespace("namespace-1").setTenant("tenant-1").setParallelism(1)).setVersion(version) + .build(); + + Function.FunctionMetaData function2 = Function.FunctionMetaData.newBuilder() + .setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath("/foo/bar1")) + .setFunctionConfig(Function.FunctionConfig.newBuilder().setName("func-2") + .setNamespace("namespace-1").setTenant("tenant-1").setParallelism(3)).setVersion(version) + .build(); + functionMetaDataList.add(function1); + functionMetaDataList.add(function2); + doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); + + // set assignments + Function.Assignment assignment1 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function1).setInstanceId(0).build()) + .build(); + + Map> currentAssignments = new HashMap<>(); + Map assignmentEntry1 = new HashMap<>(); + assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); + + currentAssignments.put("worker-1", assignmentEntry1); + doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); + + //set version + doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); + + // single node + List workerInfoList = new LinkedList<>(); + workerInfoList.add(MembershipManager.WorkerInfo.of("worker-1", "workerHostname-1", 5000)); + doReturn(workerInfoList).when(membershipManager).getCurrentMembership(); + + // i am leader + doReturn(true).when(membershipManager).isLeader(); + + callSchedule(); + + List invocations = getMethodInvocationDetails(producer, Producer.class.getMethod("sendAsync", + Object.class)); + Assert.assertEquals(invocations.size(), 1); + + byte[] send = (byte[]) invocations.get(0).getRawArguments()[0]; + Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); + + log.info("assignmentsUpdate: {}", assignmentsUpdate); + + Function.Assignment assignment2_1 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function2).setInstanceId(0).build()) + .build(); + Function.Assignment assignment2_2 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function2).setInstanceId(1).build()) + .build(); + Function.Assignment assignment2_3 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function2).setInstanceId(2).build()) + .build(); + Assert.assertEquals( + assignmentsUpdate, + Request.AssignmentsUpdate.newBuilder().setVersion(version + 1) + .addAssignments(assignment1).addAssignments(assignment2_1) + .addAssignments(assignment2_2).addAssignments(assignment2_3).build() + ); + + // scale down + + PulsarClient pulsarClient = mock(PulsarClient.class); + doReturn(producer).when(pulsarClient).createProducer(any(), any()); + + Function.FunctionMetaData function2Updated = Function.FunctionMetaData.newBuilder() + .setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath("/foo/bar2")) + .setFunctionConfig(Function.FunctionConfig.newBuilder().setName("func-2") + .setNamespace("namespace-1").setTenant("tenant-1").setParallelism(3)).setVersion(version) + .build(); + functionMetaDataList = new LinkedList<>(); + functionMetaDataList.add(function1); + functionMetaDataList.add(function2Updated); + doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); + + Function.Assignment assignment2Updated1 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function2Updated).setInstanceId(0).build()) + .build(); + Function.Assignment assignment2Updated2 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function2Updated).setInstanceId(1).build()) + .build(); + Function.Assignment assignment2Updated3 = Function.Assignment.newBuilder() + .setWorkerId("worker-1") + .setInstance(Function.Instance.newBuilder() + .setFunctionMetaData(function2Updated).setInstanceId(2).build()) + .build(); + + callSchedule(); + + invocations = getMethodInvocationDetails(producer, Producer.class.getMethod("sendAsync", + Object.class)); + Assert.assertEquals(invocations.size(), 2); + + send = (byte[]) invocations.get(1).getRawArguments()[0]; + assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(send); + Assert.assertEquals(assignmentsUpdate, + Request.AssignmentsUpdate.newBuilder().setVersion(version + 1 + 1) + .addAssignments(assignment1).addAssignments(assignment2Updated1) + .addAssignments(assignment2Updated2) + .addAssignments(assignment2Updated3) + .build() + ); + } + + private void callSchedule() throws NoSuchMethodException, InterruptedException { + long intialVersion = functionRuntimeManager.getCurrentAssignmentVersion(); + int initalCount = getMethodInvocationDetails(completableFuture, + CompletableFuture.class.getMethod("get")).size(); + log.info("initalCount: {}", initalCount); + Future complete = schedulerManager.schedule(); + int count = 0; + while (!complete.isDone()) { + + int invocationCount = getMethodInvocationDetails(completableFuture, + CompletableFuture.class.getMethod("get")).size(); + log.info("invocationCount: {}", invocationCount); + + if (invocationCount >= initalCount + 1) { + doReturn(intialVersion + 1).when(functionRuntimeManager).getCurrentAssignmentVersion(); + } + + if (count > 100) { + Assert.fail("Scheduler failed to terminate!"); + } + Thread.sleep(100); + count++; + } + } + + private List getMethodInvocationDetails(Object o, Method method) throws NoSuchMethodException { + List ret = new LinkedList<>(); + for (Invocation entry : Mockito.mockingDetails(o).getInvocations()) { + if (entry.getMethod().getName().equals(method.getName())) { + ret.add(entry); + } + } + return ret; + } +}