Skip to content

Commit

Permalink
Python API: Add test case for create_reader(is_read_compacted) (apach…
Browse files Browse the repository at this point in the history
…e#5488)

* Add test case for PR apache#5483 / issue apache#5365

* Python: apply timeout_millis to receive() and read_next() in tests
  • Loading branch information
candlerb authored and merlimat committed Oct 31, 2019
1 parent 0076a1b commit d9ebec5
Showing 1 changed file with 42 additions and 27 deletions.
69 changes: 42 additions & 27 deletions pulsar-client-cpp/python/pulsar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
# Fall back to Python 2's urllib2
from urllib2 import urlopen, Request

TM = 10000 # Do not wait forever in tests

def doHttpPost(url, data):
req = Request(url, data.encode())
Expand Down Expand Up @@ -128,7 +129,7 @@ def test_producer_consumer(self):
producer = client.create_producer('my-python-topic-producer-consumer')
producer.send(b'hello')

msg = consumer.receive(1000)
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b'hello')

Expand Down Expand Up @@ -160,7 +161,7 @@ def test_consumer_initial_position(self):
producer.send(b'hello-%d' % i)

for i in range(10):
msg = consumer.receive(1000)
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b'hello-%d' % i)

Expand All @@ -187,7 +188,7 @@ def test_message_properties(self):
'b': '2'
})

msg = consumer.receive()
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.value(), 'hello')
self.assertEqual(msg.properties(), {
Expand All @@ -213,7 +214,7 @@ def test_tls_auth(self):
producer = client.create_producer('my-python-topic-producer-consumer')
producer.send(b'hello')

msg = consumer.receive(1000)
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b'hello')

Expand Down Expand Up @@ -243,7 +244,7 @@ def test_tls_auth2(self):
producer = client.create_producer('my-python-topic-producer-consumer')
producer.send(b'hello')

msg = consumer.receive(1000)
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b'hello')

Expand Down Expand Up @@ -273,7 +274,7 @@ def test_tls_auth3(self):
producer = client.create_producer('my-python-topic-producer-consumer')
producer.send(b'hello')

msg = consumer.receive(1000)
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b'hello')

Expand Down Expand Up @@ -336,7 +337,7 @@ def test_reader_simple(self):
producer = client.create_producer('my-python-topic-reader-simple')
producer.send(b'hello')

msg = reader.read_next()
msg = reader.read_next(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b'hello')

Expand All @@ -363,7 +364,7 @@ def test_reader_on_last_message(self):
producer.send(b'hello-%d' % i)

for i in range(10, 20):
msg = reader.read_next()
msg = reader.read_next(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b'hello-%d' % i)

Expand All @@ -384,7 +385,7 @@ def test_reader_on_specific_message(self):
MessageId.earliest)

for i in range(num_of_msgs/2):
msg = reader1.read_next()
msg = reader1.read_next(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b'hello-%d' % i)
last_msg_id = msg.message_id()
Expand All @@ -398,7 +399,7 @@ def test_reader_on_specific_message(self):
# When available, we should test this behaviour with `startMessageIdInclusive` opt.
from_msg_idx = last_msg_idx
for i in range(from_msg_idx, num_of_msgs):
msg = reader2.read_next()
msg = reader2.read_next(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b'hello-%d' % i)

Expand All @@ -424,15 +425,15 @@ def test_reader_on_specific_message_with_batches(self):
MessageId.earliest)

for i in range(5):
msg = reader1.read_next()
msg = reader1.read_next(TM)
last_msg_id = msg.message_id()

reader2 = client.create_reader(
'my-python-topic-reader-on-specific-message-with-batches',
last_msg_id)

for i in range(5, 11):
msg = reader2.read_next()
msg = reader2.read_next(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b'hello-%d' % i)

Expand Down Expand Up @@ -492,7 +493,7 @@ def test_producer_deduplication(self):
self.assertEqual(producer.last_sequence_id(), 2)

for i in range(3):
msg = consumer.receive()
msg = consumer.receive(TM)
self.assertEqual(msg.data(), b'hello-%d' % i)
consumer.acknowledge(msg)

Expand Down Expand Up @@ -646,19 +647,33 @@ def test_publish_compact_and_consume(self):

# after compact, consumer with `is_read_compacted=True`, expected read only the second message for same key.
consumer1 = client.subscribe(topic, 'my-sub1', is_read_compacted=True)
msg0 = consumer1.receive()
msg0 = consumer1.receive(TM)
self.assertEqual(msg0.data(), b'hello-1')
consumer1.acknowledge(msg0)
consumer1.close()

# ditto for reader
reader1 = client.create_reader(topic, MessageId.earliest, is_read_compacted=True)
msg0 = reader1.read_next(TM)
self.assertEqual(msg0.data(), b'hello-1')
reader1.close()

# after compact, consumer with `is_read_compacted=False`, expected read 2 messages for same key.
msg0 = consumer2.receive()
msg0 = consumer2.receive(TM)
self.assertEqual(msg0.data(), b'hello-0')
consumer2.acknowledge(msg0)
msg1 = consumer2.receive()
msg1 = consumer2.receive(TM)
self.assertEqual(msg1.data(), b'hello-1')
consumer2.acknowledge(msg1)
consumer2.close()

# ditto for reader
reader2 = client.create_reader(topic, MessageId.earliest, is_read_compacted=False)
msg0 = reader2.read_next(TM)
self.assertEqual(msg0.data(), b'hello-0')
msg1 = reader2.read_next(TM)
self.assertEqual(msg1.data(), b'hello-1')
reader2.close()
client.close()

def test_reader_has_message_available(self):
Expand All @@ -678,7 +693,7 @@ def test_reader_has_message_available(self):
self.assertTrue(reader.has_message_available());

for i in range(10):
msg = reader.read_next()
msg = reader.read_next(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b'hello-%d' % i)

Expand All @@ -705,14 +720,14 @@ def test_seek(self):
producer.send(b'hello-%d' % i)

for i in range(100):
msg = consumer.receive()
msg = consumer.receive(TM)
self.assertEqual(msg.data(), b'hello-%d' % i)
consumer.acknowledge(msg)

# seek, and after reconnect, expected receive first message.
consumer.seek(MessageId.earliest)
time.sleep(0.5)
msg = consumer.receive()
msg = consumer.receive(TM)
self.assertEqual(msg.data(), b'hello-0')
client.close()

Expand All @@ -730,7 +745,7 @@ def _v2_topics(self, url):
producer = client.create_producer('my-v2-topic-producer-consumer')
producer.send(b'hello')

msg = consumer.receive(1000)
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b'hello')
consumer.acknowledge(msg)
Expand Down Expand Up @@ -779,7 +794,7 @@ def test_topics_consumer(self):


for i in range(300):
msg = consumer.receive()
msg = consumer.receive(TM)
consumer.acknowledge(msg)

try:
Expand Down Expand Up @@ -834,7 +849,7 @@ def test_topics_pattern_consumer(self):


for i in range(300):
msg = consumer.receive()
msg = consumer.receive(TM)
consumer.acknowledge(msg)

try:
Expand Down Expand Up @@ -884,7 +899,7 @@ def test_token_auth(self):
producer = client.create_producer('persistent://private/auth/my-python-topic-token-auth')
producer.send(b'hello')

msg = consumer.receive(1000)
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b'hello')
client.close()
Expand All @@ -902,7 +917,7 @@ def read_token():
producer = client.create_producer('persistent://private/auth/my-python-topic-token-auth')
producer.send(b'hello')

msg = consumer.receive(1000)
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b'hello')
client.close()
Expand All @@ -916,7 +931,7 @@ def test_producer_consumer_zstd(self):
compression_type=CompressionType.ZSTD)
producer.send(b'hello')

msg = consumer.receive(1000)
msg = consumer.receive(TM)
self.assertTrue(msg)
self.assertEqual(msg.data(), b'hello')

Expand All @@ -939,7 +954,7 @@ def test_get_topic_name(self):
producer = client.create_producer('persistent://public/default/topic_name_test')
producer.send(b'hello')

msg = consumer.receive(1000)
msg = consumer.receive(TM)
self.assertEqual(msg.topic_name(), 'persistent://public/default/topic_name_test')
client.close()

Expand All @@ -960,7 +975,7 @@ def test_get_partitioned_topic_name(self):
producer = client.create_producer('persistent://public/default/partitioned_topic_name_test')
producer.send(b'hello')

msg = consumer.receive(1000)
msg = consumer.receive(TM)
self.assertTrue(msg.topic_name() in partitions)
client.close()

Expand Down

0 comments on commit d9ebec5

Please sign in to comment.