Skip to content

Commit

Permalink
Fix pulsar sink and source state (apache#5046)
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrypeng authored and merlimat committed Aug 27, 2019
1 parent 4ba8b6d commit 40d6248
Show file tree
Hide file tree
Showing 7 changed files with 387 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
*/
package org.apache.pulsar.functions.instance;

import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.netty.buffer.ByteBuf;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Summary;
import lombok.Getter;
import lombok.Setter;
import org.apache.bookkeeper.api.kv.Table;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
Expand All @@ -47,7 +50,6 @@
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkState;
Expand All @@ -72,9 +74,8 @@ class ContextImpl implements Context, SinkContext, SourceContext {
private final SecretsProvider secretsProvider;
private final Map<String, Object> secretsMap;

@Getter
@Setter
private StateContextImpl stateContext;
@VisibleForTesting
StateContextImpl stateContext;
private Map<String, Object> userConfigs;

private ComponentStatsManager statsManager;
Expand All @@ -95,7 +96,8 @@ class ContextImpl implements Context, SinkContext, SourceContext {

public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels,
Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager) {
Function.FunctionDetails.ComponentType componentType, ComponentStatsManager statsManager,
Table<ByteBuf, ByteBuf> stateTable) {
this.config = config;
this.logger = logger;
this.publishProducers = new HashMap<>();
Expand Down Expand Up @@ -146,6 +148,10 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
.quantile(0.999, 0.01)
.register(collectorRegistry);
this.componentType = componentType;

if (null != stateTable) {
this.stateContext = new StateContextImpl(stateTable);
}
}

public void setCurrentMessageContext(Record<?> record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ ContextImpl setupContext() {
Logger instanceLog = LoggerFactory.getLogger(
"function-" + instanceConfig.getFunctionDetails().getName());
return new ContextImpl(instanceConfig, instanceLog, client, secretsProvider,
collectorRegistry, metricsLabels, this.componentType, this.stats);
collectorRegistry, metricsLabels, this.componentType, this.stats, stateTable);
}

/**
Expand All @@ -232,10 +232,6 @@ public void run() {
this.componentType);

javaInstance = setupJavaInstance();
if (null != stateTable) {
StateContextImpl stateContext = new StateContextImpl(stateTable);
javaInstance.getContext().setStateContext(stateContext);
}
while (true) {
currentRecord = readInput();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.apache.bookkeeper.api.kv.Table;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
Expand Down Expand Up @@ -81,13 +82,12 @@ public void setup() {
TypedMessageBuilder messageBuilder = spy(new TypedMessageBuilderImpl(mock(ProducerBase.class), Schema.STRING));
doReturn(new CompletableFuture<>()).when(messageBuilder).sendAsync();
when(producer.newMessage()).thenReturn(messageBuilder);

context = new ContextImpl(
config,
logger,
client,
new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0],
FunctionDetails.ComponentType.FUNCTION, null);
FunctionDetails.ComponentType.FUNCTION, null, null);
context.setCurrentMessageContext((Record<String>) () -> null);
}

Expand All @@ -98,6 +98,7 @@ public void testIncrCounterStateDisabled() {

@Test(expectedExceptions = IllegalStateException.class)
public void testGetCounterStateDisabled() {

context.getCounter("test-key");
}

Expand All @@ -113,35 +114,31 @@ public void testGetStateStateDisabled() {

@Test
public void testIncrCounterStateEnabled() throws Exception {
StateContextImpl stateContext = mock(StateContextImpl.class);
context.setStateContext(stateContext);
context.stateContext = mock(StateContextImpl.class);
context.incrCounterAsync("test-key", 10L);
verify(stateContext, times(1)).incrCounter(eq("test-key"), eq(10L));
verify(context.stateContext, times(1)).incrCounter(eq("test-key"), eq(10L));
}

@Test
public void testGetCounterStateEnabled() throws Exception {
StateContextImpl stateContext = mock(StateContextImpl.class);
context.setStateContext(stateContext);
context.stateContext = mock(StateContextImpl.class);
context.getCounterAsync("test-key");
verify(stateContext, times(1)).getCounter(eq("test-key"));
verify(context.stateContext, times(1)).getCounter(eq("test-key"));
}

@Test
public void testPutStateStateEnabled() throws Exception {
StateContextImpl stateContext = mock(StateContextImpl.class);
context.setStateContext(stateContext);
context.stateContext = mock(StateContextImpl.class);
ByteBuffer buffer = ByteBuffer.wrap("test-value".getBytes(UTF_8));
context.putStateAsync("test-key", buffer);
verify(stateContext, times(1)).put(eq("test-key"), same(buffer));
verify(context.stateContext, times(1)).put(eq("test-key"), same(buffer));
}

@Test
public void testGetStateStateEnabled() throws Exception {
StateContextImpl stateContext = mock(StateContextImpl.class);
context.setStateContext(stateContext);
context.stateContext = mock(StateContextImpl.class);
context.getStateAsync("test-key");
verify(stateContext, times(1)).get(eq("test-key"));
verify(context.stateContext, times(1)).get(eq("test-key"));
}

@Test
Expand Down
7 changes: 7 additions & 0 deletions tests/docker-images/java-test-functions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@
<groupId>org.apache.pulsar.tests</groupId>
<artifactId>java-test-functions</artifactId>
<name>Apache Pulsar :: Tests :: Docker Images :: Java Test Functions</name>
<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-io-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<packaging>jar</packaging>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.io;

import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;

import java.nio.ByteBuffer;
import java.util.Map;

public class TestStateSink implements Sink<String> {

private SinkContext sinkContext;
private int count;

@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
sinkContext.putState("initial", ByteBuffer.wrap("val1".getBytes()));
this.sinkContext = sinkContext;
}

@Override
public void write(Record<String> record) throws Exception {
String initial = new String(sinkContext.getState("initial").array());
String val = String.format("%s-%d", initial, count);
sinkContext.putState("now", ByteBuffer.wrap(val.getBytes()));
count++;
}

@Override
public void close() throws Exception {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.io;

import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;

import java.nio.ByteBuffer;
import java.util.Map;

public class TestStateSource implements Source<String> {


private SourceContext sourceContext;
private int count;

@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
sourceContext.putState("initial", ByteBuffer.wrap("val1".getBytes()));
this.sourceContext = sourceContext;
}

@Override
public Record<String> read() throws Exception {
Thread.sleep(50);
String initial = new String(sourceContext.getState("initial").array());
String val = String.format("%s-%d", initial, count);
sourceContext.putState("now", ByteBuffer.wrap(val.getBytes()));
count++;
return () -> val;
}

@Override
public void close() throws Exception {

}
}
Loading

0 comments on commit 40d6248

Please sign in to comment.