-
Notifications
You must be signed in to change notification settings - Fork 75
/
Copy pathconftest.py
90 lines (72 loc) · 2.86 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import logging.config
from collections import namedtuple
from typing import Generator
import pytest
from testcontainers.core.network import Network
from .containerhelper import ContainerHelper
from .logging import LOGGING_CONFIG, patch_logger_class
# Define list of files with fixtures for pytest autodiscovery
pytest_plugins = [
"tests.test_quixstreams.test_dataframe.fixtures",
"tests.test_quixstreams.fixtures",
"tests.test_quixstreams.test_models.test_serializers.fixtures",
"tests.test_quixstreams.test_platforms.test_quix.fixtures",
"tests.test_quixstreams.test_state.fixtures",
"tests.test_quixstreams.test_state.test_rocksdb.test_windowed.fixtures",
]
KafkaContainer = namedtuple(
"KafkaContainer",
["broker_address", "internal_broker_address"],
)
SchemaRegistryContainer = namedtuple(
"SchemaRegistryContainer",
["schema_registry_address"],
)
test_logger = logging.getLogger("quixstreams.tests")
@pytest.fixture(autouse=True, scope="session")
def configure_logging():
logging.config.dictConfig(LOGGING_CONFIG)
patch_logger_class()
@pytest.fixture(autouse=True)
def log_test_progress(request: pytest.FixtureRequest):
test_logger.debug("Starting test %s", request.node.nodeid)
@pytest.fixture(scope="session")
def network():
with Network() as network:
yield network
@pytest.fixture(scope="session")
def schema_registry_container(
network: Network, kafka_container: KafkaContainer
) -> Generator[SchemaRegistryContainer, None, None]:
container, schema_registry_address = (
ContainerHelper.create_schema_registry_container(
network, kafka_container.internal_broker_address
)
)
test_logger.debug(
f"Starting Schema Registry container on {schema_registry_address}"
)
ContainerHelper.start_schema_registry_container(container)
test_logger.debug(f"Started Schema Registry container on {schema_registry_address}")
yield SchemaRegistryContainer(schema_registry_address=schema_registry_address)
container.stop()
@pytest.fixture(scope="session")
def kafka_container_factory(network: Network) -> KafkaContainer:
def factory():
(
kafka_container,
internal_broker_address,
external_broker_address,
) = ContainerHelper.create_kafka_container(network)
test_logger.debug(f"Starting Kafka container on {external_broker_address}")
ContainerHelper.start_kafka_container(kafka_container)
test_logger.debug(f"Started Kafka container on {external_broker_address}")
yield KafkaContainer(
broker_address=external_broker_address,
internal_broker_address=internal_broker_address,
)
kafka_container.stop()
return factory
@pytest.fixture(scope="session")
def kafka_container(kafka_container_factory) -> KafkaContainer:
yield from kafka_container_factory()