Skip to content

Commit

Permalink
Allow null consume in BatchPushSource (apache#7573)
Browse files Browse the repository at this point in the history
* Added upgrade notes

* Allow null message to be passed

* More private impl

* Fix unittest

* Address comments

Co-authored-by: Sanjeev Kulkarni <[email protected]>
  • Loading branch information
srkukarni and Sanjeev Kulkarni authored Jul 17, 2020
1 parent c404b1e commit 4e1a677
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.pulsar.common.io.BatchSourceConfig;
import org.apache.pulsar.functions.api.Record;

import org.apache.pulsar.io.core.BatchPushSource;
import org.apache.pulsar.io.core.BatchSource;
import org.apache.pulsar.io.core.BatchSourceTriggerer;
import org.apache.pulsar.io.core.SourceContext;
Expand Down Expand Up @@ -90,6 +91,46 @@ public void close() throws Exception {
}
}

public static class TestBatchPushSource extends BatchPushSource<String> {
@Getter
private static int prepareCount;
@Getter
private static int discoverCount;
@Getter
private static int recordCount;
private Record record = Mockito.mock(Record.class);
public TestBatchPushSource() { }

@Override
public void open(Map<String, Object> config, SourceContext context) throws Exception {
if (!config.containsKey("foo")) {
throw new IllegalArgumentException("Bad config passed to TestBatchPushSource");
}
}

@Override
public void discover(Consumer<byte[]> taskEater) throws Exception {
byte[] retval = new byte[10];
discoverCount++;
taskEater.accept(retval);
}

@Override
public void prepare(byte[] task) throws Exception {
prepareCount++;
for (int i = 0; i < 5; ++i) {
consume(record);
++recordCount;
}
consume(null);
}

@Override
public void close() throws Exception {

}
}

public static class TestDiscoveryTriggerer implements BatchSourceTriggerer {
private Consumer<String> trigger;
private Thread thread;
Expand Down Expand Up @@ -121,17 +162,21 @@ public void start(Consumer<String> trigger) {

@Override
public void stop() {
thread.interrupt();
try {
thread.join();
} catch (Exception e) {
if (thread != null) {
thread.interrupt();
try {
thread.join();
} catch (Exception e) {
}
}
}
}

private TestBatchSource testBatchSource;
private TestBatchPushSource testBatchPushSource;
private BatchSourceConfig testBatchConfig;
private Map<String, Object> config;
private Map<String, Object> pushConfig;
private BatchSourceExecutor<String> batchSourceExecutor;
private SourceContext context;
private ConsumerBuilder consumerBuilder;
Expand All @@ -140,20 +185,32 @@ public void stop() {
private CyclicBarrier discoveryBarrier;
private Message<byte[]> discoveredTask;

@BeforeMethod
public void setUp() throws Exception {
testBatchSource = new TestBatchSource();
batchSourceExecutor = new BatchSourceExecutor<>();
context = Mockito.mock(SourceContext.class);
config = new HashMap<>();
private static Map<String, Object> createConfig(String className, BatchSourceConfig batchConfig) {
Map<String, Object> config = new HashMap<>();
config.put("foo", "bar");
testBatchConfig = new BatchSourceConfig();
config.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(batchConfig));
config.put(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY, className);
return config;
}

private static BatchSourceConfig createBatchSourceConfig() {
BatchSourceConfig testBatchConfig = new BatchSourceConfig();
testBatchConfig.setDiscoveryTriggererClassName(TestDiscoveryTriggerer.class.getName());
Map<String, Object> triggererConfig = new HashMap<>();
triggererConfig.put("DELAY_MS", 500);
testBatchConfig.setDiscoveryTriggererConfig(triggererConfig);
config.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(testBatchConfig));
config.put(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY, TestBatchSource.class.getName());
return testBatchConfig;
}

@BeforeMethod
public void setUp() throws Exception {
testBatchSource = new TestBatchSource();
testBatchPushSource = new TestBatchPushSource();
batchSourceExecutor = new BatchSourceExecutor<>();
testBatchConfig = createBatchSourceConfig();
config = createConfig(TestBatchSource.class.getName(), testBatchConfig);
pushConfig = createConfig(TestBatchPushSource.class.getName(), testBatchConfig);
context = Mockito.mock(SourceContext.class);
Mockito.doReturn("test-function").when(context).getSourceName();
Mockito.doReturn("test-namespace").when(context).getNamespace();
Mockito.doReturn("test-tenant").when(context).getTenant();
Expand Down Expand Up @@ -187,21 +244,36 @@ public void setUp() throws Exception {
}

@AfterMethod
public void cleanUp() { }
public void cleanUp() throws Exception {
batchSourceExecutor.close();
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Batch Configs cannot be found")
public void testWithoutRightConfig() throws Exception {
config.clear();
batchSourceExecutor.open(config, context);
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Batch Configs cannot be found")
public void testPushWithoutRightConfig() throws Exception {
pushConfig.clear();
batchSourceExecutor.open(pushConfig, context);
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "BatchSourceTriggerer does not implement the correct interface")
public void testWithoutRightTriggerer() throws Exception {
testBatchConfig.setDiscoveryTriggererClassName(TestBatchSource.class.getName());
config.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(testBatchConfig));
batchSourceExecutor.open(config, context);
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "BatchSourceTriggerer does not implement the correct interface")
public void testPushWithoutRightTriggerer() throws Exception {
testBatchConfig.setDiscoveryTriggererClassName(TestBatchSource.class.getName());
pushConfig.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(testBatchConfig));
batchSourceExecutor.open(pushConfig, context);
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Bad config passed to TestTriggerer")
public void testWithoutRightTriggererConfig() throws Exception {
Map<String, Object> badConfig = new HashMap<>();
Expand All @@ -211,24 +283,51 @@ public void testWithoutRightTriggererConfig() throws Exception {
batchSourceExecutor.open(config, context);
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Bad config passed to TestTriggerer")
public void testPushWithoutRightTriggererConfig() throws Exception {
Map<String, Object> badConfig = new HashMap<>();
badConfig.put("something", "else");
testBatchConfig.setDiscoveryTriggererConfig(badConfig);
pushConfig.put(BatchSourceConfig.BATCHSOURCE_CONFIG_KEY, new Gson().toJson(testBatchConfig));
batchSourceExecutor.open(pushConfig, context);
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "BatchSource does not implement the correct interface")
public void testWithoutRightSource() throws Exception {
config.put(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY, TestDiscoveryTriggerer.class.getName());
batchSourceExecutor.open(config, context);
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "BatchSource does not implement the correct interface")
public void testPushWithoutRightSource() throws Exception {
pushConfig.put(BatchSourceConfig.BATCHSOURCE_CLASSNAME_KEY, TestDiscoveryTriggerer.class.getName());
batchSourceExecutor.open(pushConfig, context);
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Bad config passed to TestBatchSource")
public void testWithoutRightSourceConfig() throws Exception {
config.remove("foo");
config.put("something", "else");
batchSourceExecutor.open(config, context);
}

@Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Bad config passed to TestBatchPushSource")
public void testPushWithoutRightSourceConfig() throws Exception {
pushConfig.remove("foo");
pushConfig.put("something", "else");
batchSourceExecutor.open(pushConfig, context);
}

@Test
public void testOpenWithRightSource() throws Exception {
batchSourceExecutor.open(config, context);
}

@Test
public void testPushOpenWithRightSource() throws Exception {
batchSourceExecutor.open(pushConfig, context);
}

@Test
public void testLifeCycle() throws Exception {
batchSourceExecutor.open(config, context);
Expand All @@ -246,4 +345,22 @@ public void testLifeCycle() throws Exception {
Assert.assertTrue(testBatchSource.getDiscoverCount() >= 2);
Assert.assertTrue(testBatchSource.getDiscoverCount() <= 3);
}

@Test
public void testPushLifeCycle() throws Exception {
batchSourceExecutor.open(pushConfig, context);
Assert.assertTrue(testBatchPushSource.getDiscoverCount() < 1);
discoveryBarrier.await();
Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 1);
Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 2);
for (int i = 0; i < 5; ++i) {
batchSourceExecutor.read();
}
Assert.assertEquals(testBatchPushSource.getRecordCount(), 5);
Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 1);
Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 2);
discoveryBarrier.await();
Assert.assertTrue(testBatchPushSource.getDiscoverCount() >= 2);
Assert.assertTrue(testBatchPushSource.getDiscoverCount() <= 3);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@

import org.apache.pulsar.functions.api.Record;

import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;

/**
* Pulsar's Batch Push Source interface. Batch Push Sources have the same lifecycle
Expand All @@ -32,26 +30,43 @@
*/
public abstract class BatchPushSource<T> implements BatchSource<T> {

private static class NullRecord implements Record {
@Override
public Object getValue() {
return null;
}
}

private LinkedBlockingQueue<Record<T>> queue;
private static final int DEFAULT_QUEUE_LENGTH = 1000;
private final NullRecord nullRecord = new NullRecord();

public BatchPushSource() {
this.queue = new LinkedBlockingQueue<>(this.getQueueLength());
}

@Override
public Record<T> readNext() throws Exception {
return queue.take();
Record<T> record = queue.take();
if (record instanceof NullRecord) {
return null;
} else {
return record;
}
}

/**
* Send this message to be written to Pulsar.
*
* Pass null if you you are done with this task
* @param record next message from source which should be sent to a Pulsar topic
*/
public void consume(Record<T> record) {
try {
queue.put(record);
if (record != null) {
queue.put(record);
} else {
queue.put(nullRecord);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down

0 comments on commit 4e1a677

Please sign in to comment.