Skip to content

Commit

Permalink
Add warning report to pytest and fix many warnings in code
Browse files Browse the repository at this point in the history
  • Loading branch information
tvoinarovskyi committed Mar 3, 2018
1 parent 9659b38 commit 69786bc
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 33 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ flake:
flake8 aiokafka tests $$extra

test: flake
py.test -s --no-print-logs --docker-image $(DOCKER_IMAGE) $(FLAGS) tests
py.test -s --docker-image $(DOCKER_IMAGE) $(FLAGS) -Wdefault tests

vtest: flake
py.test -s -v --no-print-logs --docker-image $(DOCKER_IMAGE) $(FLAGS) tests
py.test -s -v --no-print-logs --docker-image $(DOCKER_IMAGE) $(FLAGS) -Wdefault tests

cov cover coverage: flake
py.test -s --cov aiokafka --cov-report html --docker-image $(DOCKER_IMAGE) $(FLAGS) tests
Expand Down
1 change: 0 additions & 1 deletion aiokafka/consumer/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -998,7 +998,6 @@ def getmany(self, *partitions, timeout_ms=0, max_records=None):
return records

if PY_35:
@asyncio.coroutine
def __aiter__(self):
if self._closed:
raise ConsumerStoppedError()
Expand Down
9 changes: 5 additions & 4 deletions aiokafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,13 +541,14 @@ def _proc_fetch_request(self, assignment, node_id, request):
"Fetch offset %s is out of range for partition %s,"
" resetting offset", fetch_offset, tp)
elif error_type is Errors.TopicAuthorizationFailedError:
log.warn("Not authorized to read from topic %s.", tp.topic)
log.warning(
"Not authorized to read from topic %s.", tp.topic)
err = Errors.TopicAuthorizationFailedError(tp.topic)
self._set_error(tp, err)
needs_wakeup = True
else:
log.warn('Unexpected error while fetching data: %s',
error_type.__name__)
log.warning('Unexpected error while fetching data: %s',
error_type.__name__)
return needs_wakeup

def _set_error(self, tp, error):
Expand Down Expand Up @@ -784,7 +785,7 @@ def _proc_offset_request(self, node_id, topic_data):
partition)
raise error_type(partition)
elif error_type is Errors.UnknownTopicOrPartitionError:
log.warn(
log.warning(
"Received unknown topic or partition error in "
"ListOffset request for partition %s. The "
"topic/partition may not exist or the user may not "
Expand Down
20 changes: 10 additions & 10 deletions tests/_testutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,19 @@ def wait_kafka(cls):
cls.hosts = ['{}:{}'.format(cls.kafka_host, cls.kafka_port)]

# Reconnecting until Kafka in docker becomes available
client = AIOKafkaClient(loop=cls.loop, bootstrap_servers=cls.hosts)
for i in range(500):
client = AIOKafkaClient(loop=cls.loop, bootstrap_servers=cls.hosts)
try:
cls.loop.run_until_complete(client.bootstrap())
# Wait for broker to look for others.
if not client.cluster.brokers():
time.sleep(0.1)
continue
# Broker can still be loading cluster layout, so we can get 0
# brokers. That counts as still not available
if client.cluster.brokers():
return
except ConnectionError:
time.sleep(0.1)
else:
pass
finally:
cls.loop.run_until_complete(client.close())
return
time.sleep(0.1)
assert False, "Kafka server never started"

def setUp(self):
Expand Down Expand Up @@ -196,10 +196,10 @@ def send_messages(self, partition, messages, *, topic=None,

def assert_message_count(self, messages, num_messages):
# Make sure we got them all
self.assertEquals(len(messages), num_messages)
self.assertEqual(len(messages), num_messages)

# Make sure there are no duplicates
self.assertEquals(len(set(messages)), num_messages)
self.assertEqual(len(set(messages)), num_messages)

def create_ssl_context(self):
return create_ssl_context(
Expand Down
8 changes: 8 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,14 @@ def loop(request):
asyncio.set_event_loop(None)


@pytest.yield_fixture(autouse=True)
def collect_garbage():
# This is used to have a better report on ResourceWarnings. Without it
# all warnings will be filled in the end of last test-case.
yield
gc.collect()


@pytest.fixture(scope='class')
def setup_test_class_serverless(request, loop, ssl_folder):
request.cls.loop = loop
Expand Down
3 changes: 2 additions & 1 deletion tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from aiokafka.client import AIOKafkaClient, ConnectionGroup
from aiokafka.conn import AIOKafkaConnection, CloseReason
from aiokafka.util import ensure_future
from ._testutil import KafkaIntegrationTestCase, run_until_complete


Expand Down Expand Up @@ -87,7 +88,7 @@ def send(request_id):
mocked_conns[(0, 0)].send.side_effect = send
client = AIOKafkaClient(loop=self.loop,
bootstrap_servers=['broker_1:4567'])
task = asyncio.async(client._md_synchronizer(), loop=self.loop)
task = ensure_future(client._md_synchronizer(), loop=self.loop)
client._conns = mocked_conns
client.cluster.update_metadata(MetadataResponse(brokers[:1], []))

Expand Down
7 changes: 4 additions & 3 deletions tests/test_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from aiokafka.conn import AIOKafkaConnection, create_conn
from aiokafka.errors import ConnectionError, CorrelationIdError
from aiokafka.util import ensure_future
from ._testutil import KafkaIntegrationTestCase, run_until_complete


Expand Down Expand Up @@ -141,7 +142,7 @@ def test_invalid_correlation_id(self):
conn._reader = reader
conn._writer = writer
# invoke reader task
conn._read_task = asyncio.async(conn._read(), loop=self.loop)
conn._read_task = ensure_future(conn._read(), loop=self.loop)

with self.assertRaises(CorrelationIdError):
yield from conn.send(request)
Expand Down Expand Up @@ -171,7 +172,7 @@ def test_correlation_id_on_group_coordinator_req(self):
conn._reader = reader
conn._writer = writer
# invoke reader task
conn._read_task = asyncio.async(conn._read(), loop=self.loop)
conn._read_task = ensure_future(conn._read(), loop=self.loop)

response = yield from conn.send(request)
self.assertIsInstance(response, GroupCoordinatorResponse)
Expand Down Expand Up @@ -201,7 +202,7 @@ def invoke_osserror(*a, **kw):
conn._reader = reader
conn._writer = writer
# invoke reader task
conn._read_task = asyncio.async(conn._read(), loop=self.loop)
conn._read_task = ensure_future(conn._read(), loop=self.loop)

with self.assertRaises(ConnectionError):
yield from conn.send(request)
Expand Down
20 changes: 10 additions & 10 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ def task(tp, messages):
self.assertEqual(m.partition, tp.partition)
messages.append(m)

task1 = asyncio.async(task(p0, messages), loop=self.loop)
task2 = asyncio.async(task(p1, messages), loop=self.loop)
task1 = ensure_future(task(p0, messages), loop=self.loop)
task2 = ensure_future(task(p1, messages), loop=self.loop)
yield from asyncio.wait([task1, task2], loop=self.loop)
self.assert_message_count(messages, 200)

Expand Down Expand Up @@ -674,13 +674,13 @@ def test_ssl_consume(self):
self.assertEqual(msgs, [b"1", b"2", b"3"])

def test_consumer_arguments(self):
with self.assertRaisesRegexp(
with self.assertRaisesRegex(
ValueError, "`security_protocol` should be SSL or PLAINTEXT"):
AIOKafkaConsumer(
self.topic, loop=self.loop,
bootstrap_servers=self.hosts,
security_protocol="SOME")
with self.assertRaisesRegexp(
with self.assertRaisesRegex(
ValueError, "`ssl_context` is mandatory if "
"security_protocol=='SSL'"):
AIOKafkaConsumer(
Expand All @@ -703,21 +703,21 @@ def test_consumer_commit_validation(self):
yield from consumer.commit("something")
with self.assertRaises(ValueError):
yield from consumer.commit({tp: (offset, "metadata", 100)})
with self.assertRaisesRegexp(
with self.assertRaisesRegex(
ValueError, "Key should be TopicPartition instance"):
yield from consumer.commit({"my_topic": offset_and_metadata})
with self.assertRaisesRegexp(
with self.assertRaisesRegex(
ValueError, "Metadata should be a string"):
yield from consumer.commit({tp: (offset, 1000)})
with self.assertRaisesRegexp(
with self.assertRaisesRegex(
ValueError, "Metadata should be a string"):
yield from consumer.commit({tp: (offset, b"\x00\x02")})

with self.assertRaisesRegexp(
with self.assertRaisesRegex(
IllegalStateError, "Partition .* is not assigned"):
yield from consumer.commit({TopicPartition(self.topic, 10): 1000})
consumer.unsubscribe()
with self.assertRaisesRegexp(
with self.assertRaisesRegex(
IllegalStateError, "Not subscribed to any topics"):
yield from consumer.commit({tp: 1000})

Expand All @@ -729,7 +729,7 @@ def test_consumer_commit_validation(self):
self.add_cleanup(consumer.stop)

consumer.subscribe(topics=set([self.topic]))
with self.assertRaisesRegexp(
with self.assertRaisesRegex(
IllegalStateError, "No partitions assigned"):
yield from consumer.commit({tp: 1000})

Expand Down
7 changes: 5 additions & 2 deletions tests/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,13 +330,13 @@ def test_producer_ssl(self):
yield from consumer.stop()

def test_producer_arguments(self):
with self.assertRaisesRegexp(
with self.assertRaisesRegex(
ValueError, "`security_protocol` should be SSL or PLAINTEXT"):
AIOKafkaProducer(
loop=self.loop,
bootstrap_servers=self.hosts,
security_protocol="SOME")
with self.assertRaisesRegexp(
with self.assertRaisesRegex(
ValueError, "`ssl_context` is mandatory if "
"security_protocol=='SSL'"):
AIOKafkaProducer(
Expand All @@ -349,6 +349,7 @@ def test_producer_flush_test(self):
producer = AIOKafkaProducer(
loop=self.loop, bootstrap_servers=self.hosts)
yield from producer.start()
self.add_cleanup(producer.stop)

fut1 = yield from producer.send("producer_flush_test", b'text1')
fut2 = yield from producer.send("producer_flush_test", b'text2')
Expand All @@ -365,6 +366,7 @@ def test_producer_correct_time_returned(self):
producer = AIOKafkaProducer(
loop=self.loop, bootstrap_servers=self.hosts)
yield from producer.start()
self.add_cleanup(producer.stop)

send_time = (time.time() * 1000)
res = yield from producer.send_and_wait(
Expand Down Expand Up @@ -402,6 +404,7 @@ def test_producer_send_empty_batch(self):
producer = AIOKafkaProducer(
loop=self.loop, bootstrap_servers=self.hosts)
yield from producer.start()
self.add_cleanup(producer.stop)

with self.assertRaises(TypeError):
yield from producer.send(self.topic, 'text1')
Expand Down

0 comments on commit 69786bc

Please sign in to comment.