Skip to content

Commit

Permalink
Separate out FunctionMetadata related helper functions (apache#7146)
Browse files Browse the repository at this point in the history
* Seperate out FunctionMetaData related functions into a utility class

* Fixed bug

Co-authored-by: Sanjeev Kulkarni <[email protected]>
  • Loading branch information
srkukarni and Sanjeev Kulkarni authored Jun 3, 2020
1 parent 75fe26c commit 8638022
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 66 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.functions.utils;

import org.apache.pulsar.functions.proto.Function;

public class FunctionMetaDataUtils {

public static boolean canChangeState(Function.FunctionMetaData functionMetaData, int instanceId, Function.FunctionState newState) {
if (instanceId >= functionMetaData.getFunctionDetails().getParallelism()) {
return false;
}
if (functionMetaData.getInstanceStatesMap() == null || functionMetaData.getInstanceStatesMap().isEmpty()) {
// This means that all instances of the functions are running
return newState == Function.FunctionState.STOPPED;
}
if (instanceId >= 0) {
if (functionMetaData.getInstanceStatesMap().containsKey(instanceId)) {
return functionMetaData.getInstanceStatesMap().get(instanceId) != newState;
} else {
return false;
}
} else {
// want to change state for all instances
for (Function.FunctionState state : functionMetaData.getInstanceStatesMap().values()) {
if (state != newState) return true;
}
return false;
}
}

public static Function.FunctionMetaData changeFunctionInstanceStatus(Function.FunctionMetaData functionMetaData,
Integer instanceId, boolean start) {
Function.FunctionMetaData.Builder builder = functionMetaData.toBuilder()
.setVersion(functionMetaData.getVersion() + 1);
if (builder.getInstanceStatesMap() == null || builder.getInstanceStatesMap().isEmpty()) {
for (int i = 0; i < functionMetaData.getFunctionDetails().getParallelism(); ++i) {
builder.putInstanceStates(i, Function.FunctionState.RUNNING);
}
}
Function.FunctionState state = start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED;
if (instanceId < 0) {
for (int i = 0; i < functionMetaData.getFunctionDetails().getParallelism(); ++i) {
builder.putInstanceStates(i, state);
}
} else if (instanceId < builder.getFunctionDetails().getParallelism()){
builder.putInstanceStates(instanceId, state);
}
return builder.build();
}

public static Function.FunctionMetaData generateUpdatedMetadata(Function.FunctionMetaData existingMetaData,
Function.FunctionMetaData updatedMetaData) {
long version = 0;
if (existingMetaData != null) {
version = existingMetaData.getVersion() + 1;
}
return updatedMetaData.toBuilder()
.setVersion(version)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.functions.utils;

import org.apache.pulsar.functions.proto.Function;
import org.testng.Assert;
import org.testng.annotations.Test;

/**
* Unit test of {@link FunctionMetaDataUtils}.
*/
public class FunctionMetaDataUtilsTest {

@Test
public void testCanChangeState() {

long version = 5;
Function.FunctionMetaData metaData = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder().setName("func-1").setParallelism(2)).setVersion(version).build();

Assert.assertTrue(FunctionMetaDataUtils.canChangeState(metaData, 0, Function.FunctionState.STOPPED));
Assert.assertFalse(FunctionMetaDataUtils.canChangeState(metaData, 0, Function.FunctionState.RUNNING));
Assert.assertFalse(FunctionMetaDataUtils.canChangeState(metaData, 2, Function.FunctionState.STOPPED));
Assert.assertFalse(FunctionMetaDataUtils.canChangeState(metaData, 2, Function.FunctionState.RUNNING));
}

@Test
public void testChangeState() {
long version = 5;
Function.FunctionMetaData metaData = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder().setName("func-1").setParallelism(2)).setVersion(version).build();
Function.FunctionMetaData newMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(metaData, 0, false);
Assert.assertTrue(newMetaData.getInstanceStatesMap() != null);
Assert.assertEquals(newMetaData.getInstanceStatesMap().size(), 2);
Assert.assertEquals(newMetaData.getInstanceStatesMap().get(0), Function.FunctionState.STOPPED);
Assert.assertEquals(newMetaData.getInstanceStatesMap().get(1), Function.FunctionState.RUNNING);
Assert.assertEquals(newMetaData.getVersion(), version + 1);

// Nothing should happen
newMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(newMetaData, 3, false);
Assert.assertTrue(newMetaData.getInstanceStatesMap() != null);
Assert.assertEquals(newMetaData.getInstanceStatesMap().size(), 2);
Assert.assertEquals(newMetaData.getInstanceStatesMap().get(0), Function.FunctionState.STOPPED);
Assert.assertEquals(newMetaData.getInstanceStatesMap().get(1), Function.FunctionState.RUNNING);
Assert.assertEquals(newMetaData.getVersion(), version + 2);

// Change one more
newMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(newMetaData, 1, false);
Assert.assertTrue(newMetaData.getInstanceStatesMap() != null);
Assert.assertEquals(newMetaData.getInstanceStatesMap().size(), 2);
Assert.assertEquals(newMetaData.getInstanceStatesMap().get(0), Function.FunctionState.STOPPED);
Assert.assertEquals(newMetaData.getInstanceStatesMap().get(1), Function.FunctionState.STOPPED);
Assert.assertEquals(newMetaData.getVersion(), version + 3);

// Change all more
newMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(newMetaData, -1, true);
Assert.assertTrue(newMetaData.getInstanceStatesMap() != null);
Assert.assertEquals(newMetaData.getInstanceStatesMap().size(), 2);
Assert.assertEquals(newMetaData.getInstanceStatesMap().get(0), Function.FunctionState.RUNNING);
Assert.assertEquals(newMetaData.getInstanceStatesMap().get(1), Function.FunctionState.RUNNING);
Assert.assertEquals(newMetaData.getVersion(), version + 4);
}

@Test
public void testUpdate() {
long version = 5;
Function.FunctionMetaData existingMetaData = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder().setName("func-1").setParallelism(2)).setVersion(version).build();
Function.FunctionMetaData updatedMetaData = Function.FunctionMetaData.newBuilder().setFunctionDetails(
Function.FunctionDetails.newBuilder().setName("func-1").setParallelism(3)).setVersion(version).build();
Function.FunctionMetaData newMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(existingMetaData, updatedMetaData);
Assert.assertEquals(newMetaData.getVersion(), version + 1);
Assert.assertEquals(newMetaData.getFunctionDetails().getParallelism(), 3);

newMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(null, newMetaData);
Assert.assertEquals(newMetaData.getVersion(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.proto.Request;
import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
import org.apache.pulsar.functions.worker.request.RequestResult;
import org.apache.pulsar.functions.worker.request.ServiceRequestInfo;
import org.apache.pulsar.functions.worker.request.ServiceRequestManager;
Expand Down Expand Up @@ -175,26 +176,16 @@ public synchronized boolean containsFunction(String tenant, String namespace, St
*/
public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMetaData functionMetaData) {

long version = 0;

String tenant = functionMetaData.getFunctionDetails().getTenant();
if (!this.functionMetaDataMap.containsKey(tenant)) {
this.functionMetaDataMap.put(tenant, new ConcurrentHashMap<>());
}

Map<String, Map<String, FunctionMetaData>> namespaces = this.functionMetaDataMap.get(tenant);
String namespace = functionMetaData.getFunctionDetails().getNamespace();
if (!namespaces.containsKey(namespace)) {
namespaces.put(namespace, new ConcurrentHashMap<>());
}

Map<String, FunctionMetaData> functionMetaDatas = namespaces.get(namespace);
String functionName = functionMetaData.getFunctionDetails().getName();
if (functionMetaDatas.containsKey(functionName)) {
version = functionMetaDatas.get(functionName).getVersion() + 1;
FunctionMetaData existingFunctionMetadata = null;
if (containsFunction(functionMetaData.getFunctionDetails().getTenant(),
functionMetaData.getFunctionDetails().getNamespace(),
functionMetaData.getFunctionDetails().getName())) {
existingFunctionMetadata = getFunctionMetaData(functionMetaData.getFunctionDetails().getTenant(),
functionMetaData.getFunctionDetails().getNamespace(),
functionMetaData.getFunctionDetails().getName());
}

FunctionMetaData newFunctionMetaData = functionMetaData.toBuilder().setVersion(version).build();
FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(existingFunctionMetadata, functionMetaData);

Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
this.workerConfig.getWorkerId(), newFunctionMetaData);
Expand All @@ -213,9 +204,7 @@ public synchronized CompletableFuture<RequestResult> updateFunction(FunctionMeta
public synchronized CompletableFuture<RequestResult> deregisterFunction(String tenant, String namespace, String functionName) {
FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);

FunctionMetaData newFunctionMetaData = functionMetaData.toBuilder()
.setVersion(functionMetaData.getVersion() + 1)
.build();
FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.generateUpdatedMetadata(functionMetaData, functionMetaData);

Request.ServiceRequest deregisterRequest = ServiceRequestUtils.getDeregisterRequest(
this.workerConfig.getWorkerId(), newFunctionMetaData);
Expand All @@ -236,22 +225,7 @@ public synchronized CompletableFuture<RequestResult> changeFunctionInstanceStatu
Integer instanceId, boolean start) {
FunctionMetaData functionMetaData = this.functionMetaDataMap.get(tenant).get(namespace).get(functionName);

FunctionMetaData.Builder builder = functionMetaData.toBuilder()
.setVersion(functionMetaData.getVersion() + 1);
if (builder.getInstanceStatesMap() == null || builder.getInstanceStatesMap().isEmpty()) {
for (int i = 0; i < functionMetaData.getFunctionDetails().getParallelism(); ++i) {
builder.putInstanceStates(i, Function.FunctionState.RUNNING);
}
}
Function.FunctionState state = start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED;
if (instanceId < 0) {
for (int i = 0; i < functionMetaData.getFunctionDetails().getParallelism(); ++i) {
builder.putInstanceStates(i, state);
}
} else {
builder.putInstanceStates(instanceId, state);
}
FunctionMetaData newFunctionMetaData = builder.build();
FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils.changeFunctionInstanceStatus(functionMetaData, instanceId, start);

Request.ServiceRequest updateRequest = ServiceRequestUtils.getUpdateRequest(
this.workerConfig.getWorkerId(), newFunctionMetaData);
Expand Down Expand Up @@ -457,29 +431,6 @@ public void close() throws Exception {
}
}

public boolean canChangeState(FunctionMetaData functionMetaData, int instanceId, Function.FunctionState newState) {
if (instanceId >= functionMetaData.getFunctionDetails().getParallelism()) {
return false;
}
if (functionMetaData.getInstanceStatesMap() == null || functionMetaData.getInstanceStatesMap().isEmpty()) {
// This means that all instances of the functions are running
return newState == Function.FunctionState.STOPPED;
}
if (instanceId >= 0) {
if (functionMetaData.getInstanceStatesMap().containsKey(instanceId)) {
return functionMetaData.getInstanceStatesMap().get(instanceId) != newState;
} else {
return false;
}
} else {
// want to change state for all instances
for (Function.FunctionState state : functionMetaData.getInstanceStatesMap().values()) {
if (state != newState) return true;
}
return false;
}
}

private ServiceRequestManager getServiceRequestManager(PulsarClient pulsarClient, String functionMetadataTopic) throws PulsarClientException {
return new ServiceRequestManager(pulsarClient.newProducer().topic(functionMetadataTopic).create());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.pulsar.functions.utils.ComponentTypeUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
Expand Down Expand Up @@ -528,7 +529,7 @@ public void changeFunctionInstanceStatus(final String tenant,
throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
}

if (!functionMetaDataManager.canChangeState(functionMetaData, Integer.parseInt(instanceId), start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED)) {
if (!FunctionMetaDataUtils.canChangeState(functionMetaData, Integer.parseInt(instanceId), start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED)) {
log.error("Operation not permitted on {}/{}/{}", tenant, namespace, componentName);
throw new RestException(Status.BAD_REQUEST, String.format("Operation not permitted"));
}
Expand Down Expand Up @@ -656,7 +657,7 @@ public void changeFunctionStatusAllInstances(final String tenant,
throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
}

if (!functionMetaDataManager.canChangeState(functionMetaData, -1, start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED)) {
if (!FunctionMetaDataUtils.canChangeState(functionMetaData, -1, start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED)) {
log.error("Operation not permitted on {}/{}/{}", tenant, namespace, componentName);
throw new RestException(Status.BAD_REQUEST, String.format("Operation not permitted"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Request;
import org.apache.pulsar.functions.utils.FunctionMetaDataUtils;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.testng.Assert;
Expand Down Expand Up @@ -182,10 +183,10 @@ public void testStopFunction() throws PulsarClientException {
Function.FunctionDetails.newBuilder().setName("func-1").setParallelism(2)).setVersion(version).build();
functionMetaDataMap1.put("func-1", f1);

Assert.assertTrue(functionMetaDataManager.canChangeState(f1, 0, Function.FunctionState.STOPPED));
Assert.assertFalse(functionMetaDataManager.canChangeState(f1, 0, Function.FunctionState.RUNNING));
Assert.assertFalse(functionMetaDataManager.canChangeState(f1, 2, Function.FunctionState.STOPPED));
Assert.assertFalse(functionMetaDataManager.canChangeState(f1, 2, Function.FunctionState.RUNNING));
Assert.assertTrue(FunctionMetaDataUtils.canChangeState(f1, 0, Function.FunctionState.STOPPED));
Assert.assertFalse(FunctionMetaDataUtils.canChangeState(f1, 0, Function.FunctionState.RUNNING));
Assert.assertFalse(FunctionMetaDataUtils.canChangeState(f1, 2, Function.FunctionState.STOPPED));
Assert.assertFalse(FunctionMetaDataUtils.canChangeState(f1, 2, Function.FunctionState.RUNNING));

functionMetaDataManager.functionMetaDataMap.put("tenant-1", new HashMap<>());
functionMetaDataManager.functionMetaDataMap.get("tenant-1").put("namespace-1", functionMetaDataMap1);
Expand Down

0 comments on commit 8638022

Please sign in to comment.