Skip to content

Commit

Permalink
[go schema] support go schema for pulsar-client-go (apache#3904)
Browse files Browse the repository at this point in the history
Signed-off-by: xiaolong.ran [email protected]

Master Issue: apache#3855

Motivation
support go schema for pulsar-client-go
  • Loading branch information
wolfstudy authored and jiazhai committed Apr 23, 2019
1 parent 8973406 commit d5036ea
Show file tree
Hide file tree
Showing 29 changed files with 1,935 additions and 60 deletions.
52 changes: 26 additions & 26 deletions pulsar-client-cpp/include/pulsar/Schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,69 +40,69 @@ enum SchemaType
STRING = 1,

/**
* A 8-byte integer.
* JSON object encoding and validation
*/
INT8 = 2,
JSON = 2,

/**
* A 16-byte integer.
* Protobuf message encoding and decoding
*/
INT16 = 3,
PROTOBUF = 3,

/**
* A 32-byte integer.
* Serialize and deserialize via Avro
*/
INT32 = 4,
AVRO = 4,

/**
* A 64-byte integer.
* A 8-byte integer.
*/
INT64 = 5,
INT8 = 6,

/**
* A float number.
* A 16-byte integer.
*/
FLOAT = 6,
INT16 = 7,

/**
* A double number
* A 32-byte integer.
*/
DOUBLE = 7,
INT32 = 8,

/**
* A bytes array.
* A 64-byte integer.
*/
BYTES = 8,
INT64 = 9,

/**
* JSON object encoding and validation
* A float number.
*/
JSON = 9,
FLOAT = 10,

/**
* Protobuf message encoding and decoding
* A double number
*/
PROTOBUF = 10,
DOUBLE = 11,

/**
* Serialize and deserialize via Avro
* A Schema that contains Key Schema and Value Schema.
*/
AVRO = 11,
KEY_VALUE = 15,

/**
* Auto Consume Type.
* A bytes array.
*/
AUTO_CONSUME = 13,
BYTES = -1,

/**
* Auto Publish Type.
* Auto Consume Type.
*/
AUTO_PUBLISH = 14,
AUTO_CONSUME = -3,

/**
* A Schema that contains Key Schema and Value Schema.
* Auto Publish Type.
*/
KEY_VALUE = 15,
AUTO_PUBLISH = -4,
};

// Return string representation of result code
Expand Down
5 changes: 5 additions & 0 deletions pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#pragma once

#include "consumer.h"
#include "producer_configuration.h"

#ifdef __cplusplus
extern "C" {
Expand Down Expand Up @@ -82,6 +83,10 @@ void pulsar_consumer_configuration_set_consumer_type(pulsar_consumer_configurati
pulsar_consumer_type pulsar_consumer_configuration_get_consumer_type(
pulsar_consumer_configuration_t *consumer_configuration);

void pulsar_consumer_configuration_set_schema_info(pulsar_consumer_configuration_t *consumer_configuration,
pulsar_schema_type schemaType, const char *name,
const char *schema, pulsar_string_map_t *properties);

/**
* A message listener enables your application to configure how to process
* and acknowledge messages delivered. A listener will be called in order
Expand Down
23 changes: 23 additions & 0 deletions pulsar-client-cpp/include/pulsar/c/producer_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,25 @@ typedef enum {
pulsar_CompressionZLib = 2
} pulsar_compression_type;

typedef enum {
pulsar_None = 0,
pulsar_String = 1,
pulsar_Json = 2,
pulsar_Protobuf = 3,
pulsar_Avro = 4,
pulsar_Boolean = 5,
pulsar_Int8 = 6,
pulsar_Int16 = 7,
pulsar_Int32 = 8,
pulsar_Int64 = 9,
pulsar_Float32 = 10,
pulsar_Float64 = 11,
pulsar_KeyValue = 15,
pulsar_Bytes = -1,
pulsar_AutoConsume = -3,
pulsar_AutoPublish = -4,
} pulsar_schema_type;

typedef struct _pulsar_producer_configuration pulsar_producer_configuration_t;

pulsar_producer_configuration_t *pulsar_producer_configuration_create();
Expand All @@ -69,6 +88,10 @@ void pulsar_producer_configuration_set_compression_type(pulsar_producer_configur
pulsar_compression_type pulsar_producer_configuration_get_compression_type(
pulsar_producer_configuration_t *conf);

void pulsar_producer_configuration_set_schema_info(pulsar_producer_configuration_t *conf,
pulsar_schema_type schemaType, const char *name,
const char *schema, pulsar_string_map_t *properties);

void pulsar_producer_configuration_set_max_pending_messages(pulsar_producer_configuration_t *conf,
int maxPendingMessages);

Expand Down
8 changes: 8 additions & 0 deletions pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ pulsar_consumer_type pulsar_consumer_configuration_get_consumer_type(
return (pulsar_consumer_type)consumer_configuration->consumerConfiguration.getConsumerType();
}

void pulsar_consumer_configuration_set_schema_info(pulsar_consumer_configuration_t *consumer_configuration,
pulsar_schema_type schemaType, const char *name,
const char *schema, pulsar_string_map_t *properties) {
auto schemaInfo = pulsar::SchemaInfo((pulsar::SchemaType)schemaType, name, schema, properties->map);
consumer_configuration->consumerConfiguration.setSchema(schemaInfo);
}

static void message_listener_callback(pulsar::Consumer consumer, const pulsar::Message &msg,
pulsar_message_listener listener, void *ctx) {
pulsar_consumer_t c_consumer;
Expand Down Expand Up @@ -105,6 +112,7 @@ void pulsar_configure_set_negative_ack_redelivery_delay_ms(
pulsar_consumer_configuration_t *consumer_configuration, long redeliveryDelayMillis) {
consumer_configuration->consumerConfiguration.setNegativeAckRedeliveryDelayMs(redeliveryDelayMillis);
}

long pulsar_configure_get_negative_ack_redelivery_delay_ms(
pulsar_consumer_configuration_t *consumer_configuration) {
return consumer_configuration->consumerConfiguration.getNegativeAckRedeliveryDelayMs();
Expand Down
7 changes: 7 additions & 0 deletions pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ pulsar_compression_type pulsar_producer_configuration_get_compression_type(
return (pulsar_compression_type)conf->conf.getCompressionType();
}

void pulsar_producer_configuration_set_schema_info(pulsar_producer_configuration_t *conf,
pulsar_schema_type schemaType, const char *name,
const char *schema, pulsar_string_map_t *properties) {
auto schemaInfo = pulsar::SchemaInfo((pulsar::SchemaType)schemaType, name, schema, properties->map);
conf->conf.setSchema(schemaInfo);
}

void pulsar_producer_configuration_set_max_pending_messages(pulsar_producer_configuration_t *conf,
int maxPendingMessages) {
conf->conf.setMaxPendingMessages(maxPendingMessages);
Expand Down
7 changes: 7 additions & 0 deletions pulsar-client-go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,15 @@ module github.com/apache/pulsar/pulsar-client-go

require (
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/alecthomas/jsonschema v0.0.0-20190122210438-a6952de1bbe6
github.com/davecgh/go-spew v1.1.1
github.com/gogo/protobuf v1.2.1
github.com/golang/protobuf v1.3.1
github.com/golang/snappy v0.0.1 // indirect
github.com/linkedin/goavro v2.1.0+incompatible
github.com/sirupsen/logrus v1.3.0
github.com/stretchr/testify v1.3.0
gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/yaml.v2 v2.2.2 // indirect
)
15 changes: 15 additions & 0 deletions pulsar-client-go/go.sum
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/alecthomas/jsonschema v0.0.0-20190122210438-a6952de1bbe6 h1:xadBCbc8D9mmkaNfCsEBHbIoCjbayJXJNsY1JjPjNio=
github.com/alecthomas/jsonschema v0.0.0-20190122210438-a6952de1bbe6/go.mod h1:qpebaTNSsyUn5rPSJMsfqEtDw71TTggXM6stUDI16HA=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/linkedin/goavro v2.1.0+incompatible h1:DV2aUlj2xZiuxQyvag8Dy7zjY69ENjS66bWkSfdpddY=
github.com/linkedin/goavro v2.1.0+incompatible/go.mod h1:bBCwI2eGYpUI/4820s67MElg9tdeLbINjLjiM2xZFYM=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
Expand All @@ -18,8 +30,11 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/linkedin/goavro.v1 v1.0.5 h1:BJa69CDh0awSsLUmZ9+BowBdokpduDZSM9Zk8oKHfN4=
gopkg.in/linkedin/goavro.v1 v1.0.5/go.mod h1:Aw5GdAbizjOEl0kAMHV9iHmA8reZzW/OKuJAl4Hb9F0=
gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
Expand Down
74 changes: 65 additions & 9 deletions pulsar-client-go/pulsar/c_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (client *client) CreateProducer(options ProducerOptions) (Producer, error)
error
})

client.CreateProducerAsync(options, func(producer Producer, err error) {
client.CreateProducerAsync(options, nil, func(producer Producer, err error) {
c <- struct {
Producer
error
Expand All @@ -208,8 +208,28 @@ func (client *client) CreateProducer(options ProducerOptions) (Producer, error)
return res.Producer, res.error
}

func (client *client) CreateProducerAsync(options ProducerOptions, callback func(producer Producer, err error)) {
createProducerAsync(client, options, callback)
func (client *client) CreateProducerWithSchema(options ProducerOptions, schema Schema) (Producer, error) {
// Create is implemented on async create with a channel to wait for
// completion without blocking the real thread
c := make(chan struct {
Producer
error
})

client.CreateProducerAsync(options, schema, func(producer Producer, err error) {
c <- struct {
Producer
error
}{producer, err}
close(c)
})

res := <-c
return res.Producer, res.error
}

func (client *client) CreateProducerAsync(options ProducerOptions, schema Schema, callback func(producer Producer, err error)) {
createProducerAsync(client, schema, options, callback)
}

func (client *client) Subscribe(options ConsumerOptions) (Consumer, error) {
Expand All @@ -218,7 +238,25 @@ func (client *client) Subscribe(options ConsumerOptions) (Consumer, error) {
error
})

client.SubscribeAsync(options, func(consumer Consumer, err error) {
client.SubscribeAsync(options, nil, func(consumer Consumer, err error) {
c <- struct {
Consumer
error
}{consumer, err}
close(c)
})

res := <-c
return res.Consumer, res.error
}

func (client *client) SubscribeWithSchema(options ConsumerOptions, schema Schema) (Consumer, error) {
c := make(chan struct {
Consumer
error
})

client.SubscribeAsync(options, schema, func(consumer Consumer, err error) {
c <- struct {
Consumer
error
Expand All @@ -230,8 +268,8 @@ func (client *client) Subscribe(options ConsumerOptions) (Consumer, error) {
return res.Consumer, res.error
}

func (client *client) SubscribeAsync(options ConsumerOptions, callback func(Consumer, error)) {
subscribeAsync(client, options, callback)
func (client *client) SubscribeAsync(options ConsumerOptions, schema Schema, callback func(Consumer, error)) {
subscribeAsync(client, options, schema, callback)
}

func (client *client) CreateReader(options ReaderOptions) (Reader, error) {
Expand All @@ -240,7 +278,25 @@ func (client *client) CreateReader(options ReaderOptions) (Reader, error) {
error
})

client.CreateReaderAsync(options, func(reader Reader, err error) {
client.CreateReaderAsync(options, nil, func(reader Reader, err error) {
c <- struct {
Reader
error
}{reader, err}
close(c)
})

res := <-c
return res.Reader, res.error
}

func (client *client) CreateReaderWithSchema(options ReaderOptions, schema Schema) (Reader, error) {
c := make(chan struct {
Reader
error
})

client.CreateReaderAsync(options, schema, func(reader Reader, err error) {
c <- struct {
Reader
error
Expand Down Expand Up @@ -271,8 +327,8 @@ func pulsarGetTopicPartitionsCallbackProxy(res C.pulsar_result, cPartitions *C.p
}
}

func (client *client) CreateReaderAsync(options ReaderOptions, callback func(Reader, error)) {
createReaderAsync(client, options, callback)
func (client *client) CreateReaderAsync(options ReaderOptions, schema Schema, callback func(Reader, error)) {
createReaderAsync(client, schema, options, callback)
}

func (client *client) TopicPartitions(topic string) ([]string, error) {
Expand Down
Loading

0 comments on commit d5036ea

Please sign in to comment.