Skip to content

Commit

Permalink
Do not send empty produce requests. Fixes aio-libs#323
Browse files Browse the repository at this point in the history
  • Loading branch information
tvoinarovskyi committed Feb 20, 2018
1 parent 667683b commit 54fa682
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 1 deletion.
12 changes: 11 additions & 1 deletion aiokafka/producer/message_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ def get_data_buffer(self):
self._buffer.seek(0)
return self._buffer

def is_empty(self):
return self._builder.record_count() == 0


class MessageAccumulator:
"""Accumulator of messages batched by topic-partition
Expand Down Expand Up @@ -296,7 +299,14 @@ def drain_by_nodes(self, ignore_nodes):
continue

batch = self._pop_batch(tp)
nodes[leader][tp] = batch
# We can get an empty batch here if all `append()` calls failed
# with validation...
if not batch.is_empty():
nodes[leader][tp] = batch
else:
# XXX: use something more graceful. We just want to trigger
# delivery future here, no message futures.
batch.done_noack()

# all batches are drained from accumulator
# so create "wait data" future again for waiting new data in send
Expand Down
19 changes: 19 additions & 0 deletions tests/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,3 +393,22 @@ def mocked_send(*args, **kw):
"XXXX", b'text1', partition=0)
self.assertEqual(res.timestamp_type, LOG_APPEND_TIME)
self.assertEqual(res.timestamp, expected_timestamp)

@run_until_complete
def test_producer_send_empty_batch(self):
# We trigger a unique case here, we don't send any messages, but the
# ProduceBatch will be created. It should be discarded as it contains
# 0 messages by sender routine.
producer = AIOKafkaProducer(
loop=self.loop, bootstrap_servers=self.hosts)
yield from producer.start()

with self.assertRaises(TypeError):
yield from producer.send(self.topic, 'text1')

send_mock = mock.Mock()
send_mock.side_effect = producer._send_produce_req
producer._send_produce_req = send_mock

yield from producer.flush()
self.assertEqual(send_mock.call_count, 0)

0 comments on commit 54fa682

Please sign in to comment.