Skip to content

Commit

Permalink
Create an ES client per simulated client instead of per worker. (elas…
Browse files Browse the repository at this point in the history
…tic#1516)

Previously, Rally would instantiate an ES client per worker and scale its
connection pool to match the number of simulated clients associated with that
worker. This worker-level ES client would be shared across all of the
simulated clients spawned by the worker.

This meant that any simulated client and its siblings would have to use exactly
the same ES transport options, as they all shared a single ES client object.
There are use cases, however, where sibling clients may each need to set
unique transport options (e.g. for authentication). This commit facilitates
support for these lower-level overrides by creating a distinct ES client per
simulated client, not per worker.
  • Loading branch information
michaelbaamonde authored Jun 21, 2022
1 parent 22eac6a commit 0ac68fc
Showing 1 changed file with 6 additions and 7 deletions.
13 changes: 6 additions & 7 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -1662,14 +1662,10 @@ def es_clients(all_hosts, all_client_options):
es[cluster_name] = client.EsClientFactory(cluster_hosts, all_client_options[cluster_name]).create_async()
return es

# Properly size the internal connection pool to match the number of expected clients but allow the user
# to override it if needed.
client_count = len(self.task_allocations)
es = es_clients(self.cfg.opts("client", "hosts").all_hosts, self.cfg.opts("client", "options").with_max_connections(client_count))

self.logger.info("Task assertions enabled: %s", str(self.assertions_enabled))
runner.enable_assertions(self.assertions_enabled)

clients = []
aws = []
# A parameter source should only be created once per task - it is partitioned later on per client.
params_per_task = {}
Expand All @@ -1679,6 +1675,8 @@ def es_clients(all_hosts, all_client_options):
param_source = track.operation_parameters(self.track, task)
params_per_task[task] = param_source
schedule = schedule_for(task_allocation, params_per_task[task])
es = es_clients(self.cfg.opts("client", "hosts").all_hosts, self.cfg.opts("client", "options"))
clients.append(es)
async_executor = AsyncExecutor(
client_id, task, schedule, es, self.sampler, self.cancel, self.complete, task.error_behavior(self.abort_on_error)
)
Expand All @@ -1693,8 +1691,9 @@ def es_clients(all_hosts, all_client_options):
await asyncio.get_event_loop().shutdown_asyncgens()
shutdown_asyncgens_end = time.perf_counter()
self.logger.info("Total time to shutdown asyncgens: %f seconds.", (shutdown_asyncgens_end - run_end))
for e in es.values():
await e.transport.close()
for c in clients:
for es in c.values():
await es.close()
transport_close_end = time.perf_counter()
self.logger.info("Total time to close transports: %f seconds.", (shutdown_asyncgens_end - transport_close_end))

Expand Down

0 comments on commit 0ac68fc

Please sign in to comment.