Skip to content

Commit

Permalink
Revert "Fix bulk index detailed stats and log failures (elastic#1400)" (
Browse files Browse the repository at this point in the history
elastic#1404)

This reverts commit febbfe8.
  • Loading branch information
Rick Boyd authored Dec 16, 2021
1 parent febbfe8 commit f24b3ab
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 67 deletions.
2 changes: 1 addition & 1 deletion docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ Properties
* ``conflict-probability`` (optional, defaults to 25 percent): A number between [0, 100] that defines how many of the documents will get replaced. Combining ``conflicts=sequential`` and ``conflict-probability=0`` makes Rally generate index ids by itself, instead of relying on Elasticsearch's `automatic id generation <https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#_automatic_id_generation>`_.
* ``on-conflict`` (optional, defaults to ``index``): Determines whether Rally should use the action ``index`` or ``update`` on id conflicts.
* ``recency`` (optional, defaults to 0): A number between [0,1] indicating whether to bias conflicting ids towards more recent ids (``recency`` towards 1) or whether to consider all ids for id conflicts (``recency`` towards 0). See the diagram below for details.
* ``detailed-results`` (optional, defaults to ``false``): Records more detailed meta-data for bulk requests. As it analyzes the corresponding bulk response in more detail, this might incur additional overhead which can skew measurement results. See the section below for the meta-data that are returned. This property must be set to ``true`` for individual bulk request failures to be logged by Rally.
* ``detailed-results`` (optional, defaults to ``false``): Records more detailed meta-data for bulk requests. As it analyzes the corresponding bulk response in more detail, this might incur additional overhead which can skew measurement results. See the section below for the meta-data that are returned.
* ``timeout`` (optional, defaults to ``1m``): Defines the `time period that Elasticsearch will wait per action <https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-query-params>`_ until it has finished processing the following operations: automatic index creation, dynamic mapping updates, waiting for active shards.


Expand Down
13 changes: 8 additions & 5 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,11 +520,15 @@ def detailed_stats(self, params, response):
total_document_size_bytes = 0
with_action_metadata = mandatory(params, "action-metadata-present", self)

if not isinstance(params["body"], bytes):
raise exceptions.DataError("bulk body is not of type bytes")
if isinstance(params["body"], str):
bulk_lines = params["body"].split("\n")
elif isinstance(params["body"], list):
bulk_lines = params["body"]
else:
raise exceptions.DataError("bulk body is neither string nor list")

for line_number, data in enumerate(params["body"].split(b"\n")):
line_size = len(data)
for line_number, data in enumerate(bulk_lines):
line_size = len(data.encode("utf-8"))
if with_action_metadata:
if line_number % 2 == 1:
total_document_size_bytes += line_size
Expand Down Expand Up @@ -566,7 +570,6 @@ def detailed_stats(self, params, response):
if bulk_error_count > 0:
stats["error-type"] = "bulk"
stats["error-description"] = self.error_description(error_details)
self.logger.warning("Bulk request failed: [%s]", stats["error-description"])
if "ingest_took" in response:
stats["ingest_took"] = response["ingest_took"]

Expand Down
64 changes: 3 additions & 61 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ def test_list_length(self):


def _build_bulk_body(*lines):
return "".join(line + "\n" for line in lines).encode("UTF-8")
return "".join(line + "\n" for line in lines)


class BulkIndexRunnerTests(TestCase):
Expand Down Expand Up @@ -495,7 +495,7 @@ async def test_simple_bulk_with_timeout_and_headers(self, es):
es.bulk.assert_awaited_with(
doc_type="_doc",
params={},
body=b"index_line\nindex_line\nindex_line\n",
body="index_line\nindex_line\nindex_line\n",
headers={"x-test-id": "1234"},
index="test1",
opaque_id="DESIRED-OPAQUE-ID",
Expand Down Expand Up @@ -1037,69 +1037,11 @@ async def test_simple_bulk_with_detailed_stats_body_as_unrecognized_type(self, e
"index": "test",
}

with self.assertRaisesRegex(exceptions.DataError, "bulk body is not of type bytes"):
with self.assertRaisesRegex(exceptions.DataError, "bulk body is neither string nor list"):
await bulk(es, bulk_params)

es.bulk.assert_awaited_with(body=bulk_params["body"], params={})

@mock.patch("elasticsearch.Elasticsearch")
@run_async
async def test_bulk_index_error_logs_warning_with_detailed_stats_body(self, es):
es.bulk = mock.AsyncMock(
return_value={
"took": 5,
"errors": True,
"items": [
{
"create": {
"_index": "test",
"_type": "_doc",
"_id": "6UNLsn0BfMD3e6iftbdV",
"status": 429,
"error": {
"type": "cluster_block_exception",
"reason": "index [test] blocked by: [TOO_MANY_REQUESTS/12/disk usage exceeded "
"flood-stage watermark, index has read-only-allow-delete block];",
},
}
}
],
}
)

bulk = runner.BulkIndex()

bulk_params = {
"body": _build_bulk_body(
'{ "index" : { "_index" : "test", "_type" : "_doc" } }',
'{"message" : "in a bottle"}',
),
"action-metadata-present": True,
"bulk-size": 1,
"unit": "docs",
"detailed-results": True,
"index": "test",
}

with mock.patch.object(bulk.logger, "warning") as mocked_warning_logger:
result = await bulk(es, bulk_params)
mocked_warning_logger.assert_has_calls([mock.call("Bulk request failed: [%s]", result["error-description"])])

self.assertEqual("test", result["index"])
self.assertEqual(5, result["took"])
self.assertEqual(1, result["weight"])
self.assertEqual("docs", result["unit"])
self.assertEqual(False, result["success"])
self.assertEqual(1, result["error-count"])
self.assertEqual("bulk", result["error-type"])
self.assertEqual(
"HTTP status: 429, message: index [test] blocked by: [TOO_MANY_REQUESTS/12/disk usage "
"exceeded flood-stage watermark, index has read-only-allow-delete block];",
result["error-description"],
)

es.bulk.assert_awaited_with(body=bulk_params["body"], params={})


class ForceMergeRunnerTests(TestCase):
@mock.patch("elasticsearch.Elasticsearch")
Expand Down

0 comments on commit f24b3ab

Please sign in to comment.