Skip to content

Commit

Permalink
Added support for Snappy compression for Python/Go (apache#4319)
Browse files Browse the repository at this point in the history
* Added support for Snappy compression for Python/Go.

* Fix python test

* Fix python test

* just rm python test
  • Loading branch information
murong00 authored and merlimat committed May 28, 2019
1 parent 28c5503 commit dc0463c
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 2 deletions.
4 changes: 3 additions & 1 deletion pulsar-client-cpp/python/pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,9 +416,11 @@ def create_producer(self, topic,
* `compression_type`:
Set the compression type for the producer. By default, message
payloads are not compressed. Supported compression types are
`CompressionType.LZ4`, `CompressionType.ZLib` and `CompressionType.ZSTD`.
`CompressionType.LZ4`, `CompressionType.ZLib`, `CompressionType.ZSTD` and `CompressionType.SNAPPY`.
ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
release in order to be able to receive messages compressed with ZSTD.
SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that
release in order to be able to receive messages compressed with SNAPPY.
* `max_pending_messages`:
Set the max size of the queue holding the messages pending to receive
an acknowledgment from the broker.
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/python/src/enums.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ void export_enums() {
.value("LZ4", CompressionLZ4)
.value("ZLib", CompressionZLib)
.value("ZSTD", CompressionZSTD)
.value("SNAPPY", CompressionSNAPPY)
;

enum_<ConsumerType>("ConsumerType")
Expand Down
5 changes: 5 additions & 0 deletions pulsar-client-go/pulsar/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const (
LZ4
ZLib
ZSTD
SNAPPY
)

type TopicMetadata interface {
Expand Down Expand Up @@ -120,9 +121,13 @@ type ProducerOptions struct {
// - LZ4
// - ZLIB
// - ZSTD
// - SNAPPY
//
// Note: ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
// release in order to be able to receive messages compressed with ZSTD.
//
// Note: SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that
// release in order to be able to receive messages compressed with SNAPPY.
CompressionType

// Set a custom message routing policy by passing an implementation of MessageRouter
Expand Down
29 changes: 29 additions & 0 deletions pulsar-client-go/pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,35 @@ func TestProducerZstd(t *testing.T) {
}
}

func TestProducerSnappy(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})

assert.Nil(t, err)
defer client.Close()

producer, err := client.CreateProducer(ProducerOptions{
Topic: "my-topic",
CompressionType: SNAPPY,
})

assert.Nil(t, err)
defer producer.Close()

assert.Equal(t, producer.Topic(), "persistent://public/default/my-topic")

ctx := context.Background()

for i := 0; i < 10; i++ {
if err := producer.Send(ctx, ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
t.Fatal(err)
}
}
}

func TestProducer_Flush(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
Expand Down
2 changes: 1 addition & 1 deletion site2/docs/client-libraries-go.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ Parameter | Description | Default
`BlockIfQueueFull` | If set to `true`, the producer's `Send` and `SendAsync` methods will block when the outgoing message queue is full rather than failing and throwing an error (the size of that queue is dictated by the `MaxPendingMessages` parameter); if set to `false` (the default), `Send` and `SendAsync` operations will fail and throw a `ProducerQueueIsFullError` when the queue is full. | `false`
`MessageRoutingMode` | The message routing logic (for producers on [partitioned topics](concepts-architecture-overview.md#partitioned-topics)). This logic is applied only when no key is set on messages. The available options are: round robin (`pulsar.RoundRobinDistribution`, the default), publishing all messages to a single partition (`pulsar.UseSinglePartition`), or a custom partitioning scheme (`pulsar.CustomPartition`). | `pulsar.RoundRobinDistribution`
`HashingScheme` | The hashing function that determines the partition on which a particular message is published (partitioned topics only). The available options are: `pulsar.JavaStringHash` (the equivalent of `String.hashCode()` in Java), `pulsar.Murmur3_32Hash` (applies the [Murmur3](https://en.wikipedia.org/wiki/MurmurHash) hashing function), or `pulsar.BoostHash` (applies the hashing function from C++'s [Boost](https://www.boost.org/doc/libs/1_62_0/doc/html/hash.html) library) | `pulsar.JavaStringHash`
`CompressionType` | The message data compression type used by the producer. The available options are [`LZ4`](https://github.com/lz4/lz4), [`ZLIB`](https://zlib.net/) and [`ZSTD`](https://facebook.github.io/zstd/). | No compression
`CompressionType` | The message data compression type used by the producer. The available options are [`LZ4`](https://github.com/lz4/lz4), [`ZLIB`](https://zlib.net/), [`ZSTD`](https://facebook.github.io/zstd/) and [`SNAPPY`](https://google.github.io/snappy/). | No compression
`MessageRouter` | By default, Pulsar uses a round-robin routing scheme for [partitioned topics](cookbooks-partitioned.md). The `MessageRouter` parameter enables you to specify custom routing logic via a function that takes the Pulsar message and topic metadata as an argument and returns an integer (where the ), i.e. a function signature of `func(Message, TopicMetadata) int`. |

## Consumers
Expand Down

0 comments on commit dc0463c

Please sign in to comment.