Skip to content

Commit

Permalink
Issue apache#2657: change function cli getstate to use REST endpoint (a…
Browse files Browse the repository at this point in the history
…pache#2943)

# Motivation
change StateGetter to use REST endpoint

# Modifications
change StateGetter to use REST endpoint
change related unit test

# Result
unit tests pass.
  • Loading branch information
jiazhai authored Nov 22, 2018
1 parent 26b91bb commit 4253de7
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 130 deletions.
7 changes: 2 additions & 5 deletions distribution/server/src/assemble/LICENSE.bin.txt
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ The Apache Software License, Version 2.0
- commons-configuration-commons-configuration-1.6.jar
- commons-digester-commons-digester-1.8.jar
- commons-io-commons-io-2.5.jar
- commons-lang-commons-lang-2.6.jar
- commons-lang-commons-lang-2.4.jar
- commons-logging-commons-logging-1.1.1.jar
- org.apache.commons-commons-collections4-4.1.jar
- org.apache.commons-commons-compress-1.15.jar
Expand Down Expand Up @@ -476,14 +476,11 @@ BSD 3-clause "New" or "Revised" License
- com.google.auth-google-auth-library-credentials-0.9.0.jar -- licenses/LICENSE-google-auth-library.txt
* JLine -- jline-jline-0.9.94.jar -- licenses/LICENSE.JLine.txt
* LevelDB -- (included in org.rocksdb.*.jar) -- licenses/LICENSE-LevelDB.txt
* JSR305 -- com.google.code.findbugs-jsr305-3.0.2.jar -- licenses/LICENSE-JSR305.txt
* JSR305 -- com.google.code.findbugs-jsr305-3.0.0.jar -- licenses/LICENSE-JSR305.txt

BSD 2-Clause License
* HdrHistogram -- org.hdrhistogram-HdrHistogram-2.1.9.jar -- licenses/LICENSE-HdrHistogram.txt

BSD License
* Hamcrest -- org.hamcrest-hamcrest-core-1.1.jar -- licenses/LICENSE-Hamcrest.txt

MIT License
* Java SemVer -- com.github.zafarkhaja-java-semver-0.9.0.jar -- licenses/LICENSE-SemVer.txt
* SLF4J -- licenses/LICENSE-SLF4J.txt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,63 +18,54 @@
*/
package org.apache.pulsar.admin.cli;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.api.StorageClient;
import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.admin.cli.CmdFunctions.CreateFunction;
import org.apache.pulsar.admin.cli.CmdFunctions.DeleteFunction;
import org.apache.pulsar.admin.cli.CmdFunctions.GetFunction;
import org.apache.pulsar.admin.cli.CmdFunctions.GetFunctionStatus;
import org.apache.pulsar.admin.cli.CmdFunctions.ListFunctions;
import org.apache.pulsar.admin.cli.CmdFunctions.RestartFunction;
import org.apache.pulsar.admin.cli.CmdFunctions.StateGetter;
import org.apache.pulsar.admin.cli.CmdFunctions.StopFunction;
import org.apache.pulsar.admin.cli.CmdFunctions.UpdateFunction;
import org.apache.pulsar.client.admin.Functions;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.Utils;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.testng.IObjectFactory;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertNull;

/**
* Unit test of {@link CmdFunctions}.
*/
Expand Down Expand Up @@ -507,42 +498,26 @@ public void testListFunctions() throws Exception {

@Test
public void testStateGetter() throws Exception {
String tenant = TEST_NAME + "_tenant";
String namespace = TEST_NAME + "_namespace";
String fnName = TEST_NAME + "_function";

mockStatic(StorageClientBuilder.class);

StorageClientBuilder builder = mock(StorageClientBuilder.class);
when(builder.withSettings(any(StorageClientSettings.class))).thenReturn(builder);
when(builder.withNamespace(eq(tenant + "_" + namespace))).thenReturn(builder);
StorageClient client = mock(StorageClient.class);
when(builder.build()).thenReturn(client);

PowerMockito.when(StorageClientBuilder.class, "newBuilder")
.thenReturn(builder);

Table<ByteBuf, ByteBuf> table = mock(Table.class);
when(client.openTable(eq(fnName))).thenReturn(FutureUtils.value(table));
AtomicReference<ByteBuf> keyHolder = new AtomicReference<>();
doAnswer(invocationOnMock -> {
ByteBuf buf = invocationOnMock.getArgumentAt(0, ByteBuf.class);
keyHolder.set(buf);
return FutureUtils.value(null);
}).when(table).getKv(any(ByteBuf.class));
String tenant = TEST_NAME + "-tenant";
String namespace = TEST_NAME + "-namespace";
String fnName = TEST_NAME + "-function";
String key = TEST_NAME + "-key";

cmd.run(new String[] {
"querystate",
"--tenant", tenant,
"--namespace", namespace,
"--name", fnName,
"--key", "test-key",
"--storage-service-url", "bk://127.0.0.1:4181"
"--key", key
});

assertEquals(
"test-key",
new String(ByteBufUtil.getBytes(keyHolder.get()), UTF_8));
StateGetter stateGetter = cmd.getStateGetter();

assertEquals(tenant, stateGetter.getTenant());
assertEquals(namespace, stateGetter.getNamespace());
assertEquals(fnName, stateGetter.getFunctionName());

verify(functions, times(1)).getFunctionState(eq(tenant), eq(namespace), eq(fnName), eq(key));
}

private static final String fnName = TEST_NAME + "-function";
Expand Down
6 changes: 0 additions & 6 deletions pulsar-client-tools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,6 @@
</dependency>

<!-- functions related dependencies (begin) -->

<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>stream-storage-java-client</artifactId>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-functions-utils</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
*/
package org.apache.pulsar.admin.cli;

import static com.google.common.base.Preconditions.checkNotNull;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import static org.apache.commons.lang.StringUtils.isBlank;
import static org.apache.commons.lang.StringUtils.isNotBlank;
import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
Expand All @@ -35,11 +32,7 @@
import com.google.gson.GsonBuilder;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;

import com.google.protobuf.util.JsonFormat;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;

import java.io.File;
import java.lang.reflect.Field;
Expand All @@ -52,20 +45,14 @@

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.apache.bookkeeper.api.StorageClient;
import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.api.kv.result.KeyValue;
import org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.admin.cli.utils.CmdUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.WindowConfig;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.functions.WindowConfig;

@Slf4j
@Parameters(commandDescription = "Interface for managing Pulsar Functions (lightweight, Lambda-style compute processes that work with Pulsar)")
Expand Down Expand Up @@ -690,10 +677,10 @@ void runCmd() throws Exception {

@Parameters(commandDescription = "Temporary stops function instance. (If worker restarts then it reassigns and starts functiona again")
class StopFunction extends FunctionCommand {

@Parameter(names = "--instance-id", description = "The function instanceId (stop all instances if instance-id is not provided")
protected String instanceId;

@Override
void runCmd() throws Exception {
if (isNotBlank(instanceId)) {
Expand Down Expand Up @@ -763,55 +750,18 @@ class StateGetter extends FunctionCommand {
@Parameter(names = { "-k", "--key" }, description = "key")
private String key = null;

// TODO: this url should be fetched along with bookkeeper location from pulsar admin
@Parameter(names = { "-u", "--storage-service-url" }, description = "The URL for the storage service used by the function")
private String stateStorageServiceUrl = null;

@Parameter(names = { "-w", "--watch" }, description = "Watch for changes in the value associated with a key for a Pulsar Function")
private boolean watch = false;

@Override
void runCmd() throws Exception {
checkNotNull(stateStorageServiceUrl, "The state storage service URL is missing");

String tableNs = String.format(
"%s_%s",
tenant,
namespace).replace('-', '_');

String tableName = getFunctionName();

try (StorageClient client = StorageClientBuilder.newBuilder()
.withSettings(StorageClientSettings.newBuilder()
.serviceUri(stateStorageServiceUrl)
.clientName("functions-admin")
.build())
.withNamespace(tableNs)
.build()) {
try (Table<ByteBuf, ByteBuf> table = result(client.openTable(tableName))) {
long lastVersion = -1L;
do {
try (KeyValue<ByteBuf, ByteBuf> kv = result(table.getKv(Unpooled.wrappedBuffer(key.getBytes(UTF_8))))) {
if (null == kv) {
System.out.println("key '" + key + "' doesn't exist.");
} else {
if (kv.version() > lastVersion) {
if (kv.isNumber()) {
System.out.println("value = " + kv.numberValue());
} else {
System.out.println("value = " + new String(ByteBufUtil.getBytes(kv.value()), UTF_8));
}
lastVersion = kv.version();
}
}
}
if (watch) {
Thread.sleep(1000);
}
} while (watch);
do {
String valueAndVersion = admin.functions().getFunctionState(tenant, namespace, functionName, key);
System.out.println(valueAndVersion);
if (watch) {
Thread.sleep(1000);
}
}

} while (watch);
}
}

Expand Down

0 comments on commit 4253de7

Please sign in to comment.