Skip to content

Commit

Permalink
[pulsar-functions-go] support set subscription position (apache#11990)
Browse files Browse the repository at this point in the history
* stash

* set default value
  • Loading branch information
freeznet authored Sep 24, 2021
1 parent 8c4c630 commit 652d154
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 10 deletions.
9 changes: 5 additions & 4 deletions pulsar-function-go/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ type Conf struct {
AutoACK bool `json:"autoAck" yaml:"autoAck"`
Parallelism int32 `json:"parallelism" yaml:"parallelism"`
//source config
SubscriptionType int32 `json:"subscriptionType" yaml:"subscriptionType"`
TimeoutMs uint64 `json:"timeoutMs" yaml:"timeoutMs"`
SubscriptionName string `json:"subscriptionName" yaml:"subscriptionName"`
CleanupSubscription bool `json:"cleanupSubscription" yaml:"cleanupSubscription"`
SubscriptionType int32 `json:"subscriptionType" yaml:"subscriptionType"`
TimeoutMs uint64 `json:"timeoutMs" yaml:"timeoutMs"`
SubscriptionName string `json:"subscriptionName" yaml:"subscriptionName"`
CleanupSubscription bool `json:"cleanupSubscription" yaml:"cleanupSubscription"`
SubscriptionPosition int32 `json:"subscriptionPosition" yaml:"subscriptionPosition"`
//source input specs
SourceSpecTopic string `json:"sourceSpecsTopic" yaml:"sourceSpecsTopic"`
SourceSchemaType string `json:"sourceSchemaType" yaml:"sourceSchemaType"`
Expand Down
1 change: 1 addition & 0 deletions pulsar-function-go/conf/conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ subscriptionType: 0
timeoutMs: 0
subscriptionName: ""
cleanupSubscription: false
subscriptionPosition: 1
# source input specs
sourceSpecsTopic: persistent://public/default/topic-01
sourceSchemaType: ""
Expand Down
7 changes: 4 additions & 3 deletions pulsar-function-go/pf/instanceConf.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,10 @@ func newInstanceConf() *instanceConf {
},
},
},
TimeoutMs: cfg.TimeoutMs,
SubscriptionName: cfg.SubscriptionName,
CleanupSubscription: cfg.CleanupSubscription,
TimeoutMs: cfg.TimeoutMs,
SubscriptionName: cfg.SubscriptionName,
CleanupSubscription: cfg.CleanupSubscription,
SubscriptionPosition: pb.SubscriptionPosition(cfg.SubscriptionPosition),
},
Sink: &pb.SinkSpec{
Topic: cfg.SinkSpecTopic,
Expand Down
7 changes: 4 additions & 3 deletions pulsar-function-go/pf/instanceConf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ func Test_newInstanceConf(t *testing.T) {
},
},
},
TimeoutMs: 0,
SubscriptionName: "",
CleanupSubscription: false,
TimeoutMs: 0,
SubscriptionName: "",
CleanupSubscription: false,
SubscriptionPosition: pb.SubscriptionPosition_EARLIEST,
},
Sink: &pb.SinkSpec{
Topic: "persistent://public/default/topic-02",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import lombok.Getter;
import lombok.Setter;
import org.apache.pulsar.functions.proto.Function;

@Setter
@Getter
Expand Down Expand Up @@ -50,6 +51,7 @@ public class GoInstanceConfig {
private long timeoutMs;
private String subscriptionName = "";
private boolean cleanupSubscription;
private int subscriptionPosition = Function.SubscriptionPosition.LATEST.getNumber();

private String sourceSpecsTopic = "";
private String sourceSchemaType = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig,
if (instanceConfig.getFunctionDetails().getSource().getSubscriptionName() != null) {
goInstanceConfig.setSubscriptionName(instanceConfig.getFunctionDetails().getSource().getSubscriptionName());
}
goInstanceConfig.setSubscriptionPosition(
instanceConfig.getFunctionDetails().getSource().getSubscriptionPosition().getNumber());

if (instanceConfig.getFunctionDetails().getSource().getInputSpecsMap() != null) {
for (String inputTopic : instanceConfig.getFunctionDetails().getSource().getInputSpecsMap().keySet()) {
Expand Down

0 comments on commit 652d154

Please sign in to comment.