Skip to content

Commit

Permalink
implement topic routing on a per record basis (apache#2605)
Browse files Browse the repository at this point in the history
### Motivation

There are use cases that the destination topic for a message cannot be determined at source submission time.  This requires the ability for sources to to set which topic a record should be written to.

### Modifications

1. add an interface Record that allows users to set the destination topic for a record
2. Refactored the Pulsar sink code to support this
  • Loading branch information
jerrypeng authored and sijie committed Sep 26, 2018
1 parent fa44e19 commit e15a606
Show file tree
Hide file tree
Showing 9 changed files with 391 additions and 588 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,13 @@ default void ack() {
*/
default void fail() {
}

/**
* To support message routing on a per message basis
*
* @return The topic this message should be written to
*/
default Optional<String> getDestinationTopic() {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,13 @@

package org.apache.pulsar.functions.instance;

import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;

import io.netty.buffer.ByteBuf;

import java.io.FileNotFoundException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import net.jodah.typetools.TypeResolver;
import org.apache.bookkeeper.api.StorageClient;
import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.clients.StorageClientBuilder;
Expand Down Expand Up @@ -76,7 +64,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import net.jodah.typetools.TypeResolver;
import java.io.FileNotFoundException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;

/**
* A function container implemented using java thread.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,8 @@ public void fail() {
sourceRecord.fail();
}

@Override
public Optional<String> getDestinationTopic() {
return sourceRecord.getDestinationTopic();
}
}

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit e15a606

Please sign in to comment.