Skip to content

Commit

Permalink
[issues 4464] Fix batch logic and add test case (apache#4475)
Browse files Browse the repository at this point in the history
* [issues 4464] Fix batch logic and add test case

Signed-off-by: xiaolong.ran <[email protected]>

* resolve conflicts

Signed-off-by: xiaolong.ran <[email protected]>
  • Loading branch information
wolfstudy authored and jiazhai committed Jun 6, 2019
1 parent 2c3f783 commit d924420
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 3 deletions.
4 changes: 1 addition & 3 deletions pulsar-client-go/pulsar/c_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,7 @@ func createProducerAsync(client *client, schema Schema, options ProducerOptions,
C._pulsar_producer_configuration_set_message_router(conf, savePointer(&options.MessageRouter))
}

if options.Batching {
C.pulsar_producer_configuration_set_batching_enabled(conf, cBool(options.Batching))
}
C.pulsar_producer_configuration_set_batching_enabled(conf, cBool(options.Batching))

if options.BatchingMaxPublishDelay != 0 {
delayMillis := options.BatchingMaxPublishDelay.Nanoseconds() / int64(time.Millisecond)
Expand Down
52 changes: 52 additions & 0 deletions pulsar-client-go/pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,3 +336,55 @@ func TestProducer_MessageID(t *testing.T) {
assert.Equal(t, 6, index)
}
}

func TestProducer_Batch(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
assert.Nil(t, err)
defer client.Close()

topicName := "test-batch-in-producer-111"
subName := "subscription-name111"
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
Batching: true,
BatchingMaxMessages: 5,
})
assert.Nil(t, err)
defer producer.Close()

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: subName,
})
assert.Nil(t, err)
defer consumer.Close()

ctx := context.Background()

for i := 0; i < 10; i++ {
// Create a different message to send asynchronously
asyncMsg := ProducerMessage{
Payload: []byte(fmt.Sprintf("async-message-%d", i)),
}
// Attempt to send the message asynchronously and handle the response
producer.SendAsync(ctx, asyncMsg, func(msg ProducerMessage, err error) {
if err != nil {
log.Fatal(err)
}
})
}

for i := 0; i < 10; i++ {
msg, err := consumer.Receive(ctx)
if err != nil {
log.Fatal(err)
}
err = consumer.Ack(msg)
assert.Nil(t, err)
msgID := fmt.Sprintf("message ID:%v", msg.ID())
num := strings.Count(msgID, "-1")
assert.Equal(t, 1, num)
}
}

0 comments on commit d924420

Please sign in to comment.