Skip to content

Commit

Permalink
Support Python 3.8 (#669)
Browse files Browse the repository at this point in the history
* Remove loop from most asyncio calls making it implicit

* Fix coroutine mocking for Python 3.8+

* Enable tests with Python 3.8

* Reformat code with black

* Refactor compatibility utils

* Update wheel for python_snappy

* Log time during CI

* Fix force_metadata_update mocks

* Drop explicit loop usage in test bodies

* Return dataclass from `kafka_server` fixture

* Stick to Python 3.8.5 due to bug in 3.8.6

* Update change log

* Run CI on push to master only

Co-authored-by: Taras Voinarovskyi <[email protected]>
  • Loading branch information
ods and tvoinarovskyi authored Oct 21, 2020
1 parent 9bc4354 commit 48f5df5
Show file tree
Hide file tree
Showing 35 changed files with 880 additions and 849 deletions.
35 changes: 24 additions & 11 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
name: Tests

on:
push: {}
push:
branches: [ master ]
tags:
- "v0.[0-9]+.[0-9]+"
- "v0.[0-9]+.[0-9]+.dev*"
pull_request:
branches: [ master ]

Expand Down Expand Up @@ -63,12 +67,14 @@ jobs:

strategy:
matrix:
python: [3.6, 3.7]
python: [3.6, 3.7, 3.8.5]
include:
- python: 3.6
snappy_whl: tools/python_snappy-0.5.4-cp36-cp36m-win_amd64.whl
- python: 3.7
snappy_whl: tools/python_snappy-0.5.4-cp37-cp37m-win_amd64.whl
- python: 3.8.5
snappy_whl: tools/python_snappy-0.5.4-cp38-cp38-win_amd64.whl

steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -137,7 +143,7 @@ jobs:

strategy:
matrix:
python: [3.6, 3.7]
python: [3.6, 3.7, 3.8.5]

steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -210,7 +216,8 @@ jobs:
strategy:
matrix:
include:
- python: 3.7
# FIXME Stick to 3.8.5 as 3.8.6 has broken `wait_for`
- python: 3.8.5
kafka: "2.4.0"
scala: "2.12"

Expand All @@ -219,26 +226,32 @@ jobs:
kafka: "2.4.0"
scala: "2.12"

# Older brokers against latest python version
# Older python versions against latest broker
- python: 3.7
kafka: "2.4.0"
scala: "2.12"


# Older brokers against latest python version
- python: 3.8.5
kafka: "0.9.0.1"
scala: "2.11"
- python: 3.7
- python: 3.8.5
kafka: "0.10.2.1"
scala: "2.11"
- python: 3.7
- python: 3.8.5
kafka: "0.11.0.3"
scala: "2.12"
- python: 3.7
- python: 3.8.5
kafka: "1.1.1"
scala: "2.12"
- python: 3.7
- python: 3.8.5
kafka: "2.1.1"
scala: "2.12"
- python: 3.7
- python: 3.8.5
kafka: "2.2.2"
scala: "2.12"
- python: 3.7
- python: 3.8.5
kafka: "2.3.1"
scala: "2.12"
fail-fast: false
Expand Down
3 changes: 3 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ added `OAUTHBEARER` as a new `sasl_mechanism`.
=======
667.bugfix
Drop support for Python 3.5
=======
569.bugfix
Support Python 3.8

0.6.0 (2020-05-15)
==================
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ cov cover coverage: flake
@echo "open file://`pwd`/htmlcov/index.html"

ci-test-unit:
py.test -s --log-level DEBUG --cov aiokafka --cov-report xml --color=yes $(FLAGS) tests
py.test -s --log-format="%(asctime)s %(levelname)s %(message)s" --log-level DEBUG --cov aiokafka --cov-report xml --color=yes $(FLAGS) tests

ci-test-all:
py.test -s -v --log-level DEBUG --cov aiokafka --cov-report xml --color=yes --docker-image $(DOCKER_IMAGE) $(FLAGS) tests
py.test -s -v --log-format="%(asctime)s %(levelname)s %(message)s" --log-level DEBUG --cov aiokafka --cov-report xml --color=yes --docker-image $(DOCKER_IMAGE) $(FLAGS) tests

coverage.xml: .coverage
coverage xml
Expand Down
3 changes: 1 addition & 2 deletions aiokafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from .structs import (
TopicPartition, ConsumerRecord, OffsetAndTimestamp, OffsetAndMetadata
)
from .util import ensure_future


__all__ = [
Expand All @@ -24,4 +23,4 @@
"OffsetAndMetadata"
]

(ensure_future, AIOKafkaClient)
AIOKafkaClient
47 changes: 24 additions & 23 deletions aiokafka/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import logging
import random
import time

from kafka.conn import collect_hosts
from kafka.protocol.metadata import MetadataRequest
Expand All @@ -22,7 +23,7 @@
UnrecognizedBrokerVersion,
StaleMetadata)
from aiokafka.util import (
ensure_future, create_future, get_running_loop, parse_kafka_version
create_task, create_future, parse_kafka_version, get_running_loop
)


Expand Down Expand Up @@ -147,8 +148,8 @@ def __init__(self, *, loop=None, bootstrap_servers='localhost',
self._sync_task = None

self._md_update_fut = None
self._md_update_waiter = create_future(loop=self._loop)
self._get_conn_lock = asyncio.Lock(loop=loop)
self._md_update_waiter = create_future()
self._get_conn_lock = asyncio.Lock()

def __repr__(self):
return '<AIOKafkaClient client_id=%s>' % self._client_id
Expand Down Expand Up @@ -178,10 +179,13 @@ async def close(self):
for conn in self._conns.values():
futs.append(conn.close(reason=CloseReason.SHUTDOWN))
if futs:
await asyncio.gather(*futs, loop=self._loop)
await asyncio.gather(*futs)

async def bootstrap(self):
"""Try to to bootstrap initial cluster metadata"""
assert self._loop is asyncio.get_event_loop(), (
"Please create objects with the same loop as running with"
)
# using request v0 for bootstrap if not sure v1 is available
if self._api_version == "auto" or self._api_version < (0, 10):
metadata_request = MetadataRequest[0]([])
Expand All @@ -197,7 +201,7 @@ async def bootstrap(self):

try:
bootstrap_conn = await create_conn(
host, port, loop=self._loop, client_id=self._client_id,
host, port, client_id=self._client_id,
request_timeout_ms=self._request_timeout_ms,
ssl_context=self._ssl_context,
security_protocol=self._security_protocol,
Expand Down Expand Up @@ -244,21 +248,19 @@ async def bootstrap(self):

if self._sync_task is None:
# starting metadata synchronizer task
self._sync_task = ensure_future(
self._md_synchronizer(), loop=self._loop)
self._sync_task = create_task(self._md_synchronizer())

async def _md_synchronizer(self):
"""routine (async task) for synchronize cluster metadata every
`metadata_max_age_ms` milliseconds"""
while True:
await asyncio.wait(
[self._md_update_waiter],
timeout=self._metadata_max_age_ms / 1000,
loop=self._loop)
timeout=self._metadata_max_age_ms / 1000)

topics = self._topics
if self._md_update_fut is None:
self._md_update_fut = create_future(loop=self._loop)
self._md_update_fut = create_future()
ret = await self._metadata_update(self.cluster, topics)
# If list of topics changed during metadata update we must update
# it again right away.
Expand All @@ -267,7 +269,7 @@ async def _md_synchronizer(self):
# Earlier this waiter was set before sending metadata_request,
# but that was to avoid topic list changes being unnoticed, which
# is handled explicitly now.
self._md_update_waiter = create_future(loop=self._loop)
self._md_update_waiter = create_future()

self._md_update_fut.set_result(ret)
self._md_update_fut = None
Expand Down Expand Up @@ -342,9 +344,9 @@ def force_metadata_update(self):
# Wake up the `_md_synchronizer` task
if not self._md_update_waiter.done():
self._md_update_waiter.set_result(None)
self._md_update_fut = create_future(loop=self._loop)
self._md_update_fut = create_future()
# Metadata will be updated in the background by syncronizer
return asyncio.shield(self._md_update_fut, loop=self._loop)
return asyncio.shield(self._md_update_fut)

async def fetch_all_metadata(self):
cluster_md = ClusterMetadata(
Expand All @@ -362,7 +364,7 @@ def add_topic(self, topic):
topic (str): topic to track
"""
if topic in self._topics:
res = create_future(loop=self._loop)
res = create_future()
res.set_result(True)
else:
res = self.force_metadata_update()
Expand All @@ -379,7 +381,7 @@ def set_topics(self, topics):
if not topics or set(topics).difference(self._topics):
res = self.force_metadata_update()
else:
res = create_future(loop=self._loop)
res = create_future()
res.set_result(True)
self._topics = set(topics)
return res
Expand Down Expand Up @@ -432,7 +434,7 @@ async def _get_conn(
version_hint = None

self._conns[conn_id] = await create_conn(
broker.host, broker.port, loop=self._loop,
broker.host, broker.port,
client_id=self._client_id,
request_timeout_ms=self._request_timeout_ms,
ssl_context=self._ssl_context,
Expand Down Expand Up @@ -544,8 +546,8 @@ async def check_version(self, node_id=None):
assert conn, 'no connection to node with id {}'.format(node_id)
# request can be ignored by Kafka broker,
# so we send metadata request and wait response
task = self._loop.create_task(conn.send(request))
await asyncio.wait([task], timeout=0.1, loop=self._loop)
task = create_task(conn.send(request))
await asyncio.wait([task], timeout=0.1)
try:
await conn.send(MetadataRequest_v0([]))
except KafkaError:
Expand Down Expand Up @@ -617,23 +619,22 @@ async def _wait_on_metadata(self, topic):
# add topic to metadata topic list if it is not there already.
self.add_topic(topic)

t0 = self._loop.time()
t0 = time.monotonic()
while True:
await self.force_metadata_update()
if topic in self.cluster.topics():
break
if (self._loop.time() - t0) > (self._request_timeout_ms / 1000):
if (time.monotonic() - t0) > (self._request_timeout_ms / 1000):
raise UnknownTopicOrPartitionError()
if topic in self.cluster.unauthorized_topics:
raise Errors.TopicAuthorizationFailedError(topic)
await asyncio.sleep(self._retry_backoff, loop=self._loop)
await asyncio.sleep(self._retry_backoff)

return self.cluster.partitions_for_topic(topic)

async def _maybe_wait_metadata(self):
if self._md_update_fut is not None:
await asyncio.shield(
self._md_update_fut, loop=self._loop)
await asyncio.shield(self._md_update_fut)

async def coordinator_lookup(self, coordinator_type, coordinator_key):
""" Lookup which node in the cluster is the coordinator for a certain
Expand Down
Loading

0 comments on commit 48f5df5

Please sign in to comment.