Skip to content

Commit

Permalink
Unflatten implicitly in the zeek-tsv parser (tenzir#3567)
Browse files Browse the repository at this point in the history
  • Loading branch information
dominiklohmann authored Oct 11, 2023
2 parents d9f0566 + 890c8c4 commit fc83f71
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 26 deletions.
27 changes: 16 additions & 11 deletions libtenzir/builtins/formats/zeek_tsv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,14 +461,24 @@ auto parser_impl(generator<std::optional<std::string_view>> lines,
auto document = zeek_document{};
auto last_finish = std::chrono::steady_clock::now();
auto line_nr = size_t{0};
// Helper for finishing and casting.
auto finish = [&] {
auto slice = unflatten(document.builder->finish(), ".");
if (document.target_schema
and can_cast(slice.schema(), document.target_schema)) {
return cast(std::move(slice), document.target_schema);
} else {
return slice;
}
};
for (auto&& line : lines) {
const auto now = std::chrono::steady_clock::now();
// Yield at chunk boundaries.
if (document.builder
and (document.builder->rows() >= defaults::import::table_slice_size
or last_finish + defaults::import::batch_timeout < now)) {
last_finish = now;
co_yield cast(document.builder->finish(), document.target_schema);
co_yield finish();
}
if (not line) {
if (last_finish != now) {
Expand Down Expand Up @@ -501,7 +511,7 @@ auto parser_impl(generator<std::optional<std::string_view>> lines,
if (close_parser(header, unused)) {
if (document.builder) {
last_finish = now;
co_yield cast(document.builder->finish(), document.target_schema);
co_yield finish();
document = {};
}
continue;
Expand All @@ -512,7 +522,7 @@ auto parser_impl(generator<std::optional<std::string_view>> lines,
// were missing a closing tag.
if (document.builder) {
last_finish = now;
co_yield document.builder->finish();
co_yield finish();
document = {};
}
// Now we can actually assemble the header.
Expand Down Expand Up @@ -638,7 +648,6 @@ auto parser_impl(generator<std::optional<std::string_view>> lines,
document.builder = table_slice_builder{std::move(schema)};
// If there is a schema with the exact matching name, then we set it as a
// target schema and use that for casting.
// TODO: This should just unflatten instead.
auto target_schema = std::find_if(
ctrl.schemas().begin(), ctrl.schemas().end(), [&](const auto& schema) {
for (const auto& name : schema.names()) {
Expand All @@ -648,12 +657,8 @@ auto parser_impl(generator<std::optional<std::string_view>> lines,
}
return false;
});
if (target_schema != ctrl.schemas().end()
and can_cast(document.builder->schema(), *target_schema)) {
document.target_schema = *target_schema;
} else {
document.target_schema = document.builder->schema();
}
document.target_schema
= target_schema == ctrl.schemas().end() ? type{} : *target_schema;
// We intentionally fall through here; we create the builder lazily
// when we encounter the first event, but that we still need to parse
// now.
Expand Down Expand Up @@ -699,7 +704,7 @@ auto parser_impl(generator<std::optional<std::string_view>> lines,
}
}
if (document.builder and document.builder->rows() > 0) {
co_yield cast(document.builder->finish(), document.target_schema);
co_yield finish();
}
}

Expand Down
26 changes: 22 additions & 4 deletions tenzir/integration/reference/zeek-tsv-pipeline-format/step_11.ref
Original file line number Diff line number Diff line change
@@ -1,4 +1,22 @@
error: failed to parse Zeek document: mismatching number #fields and #types
= note: found 20 #fields
= note: found 19 #types
= note: line 9
#separator \x09
#set_separator ,
#empty_field (empty)
#unset_field -
#path zeek.conn
#fields ts uid id.orig_h id.orig_p id.resp_h id.resp_p proto service duration orig_bytes resp_bytes conn_state local_orig local_resp missed_bytes history orig_pkts orig_ip_bytes resp_pkts resp_ip_bytes tunnel_parents community_id _write_ts
#types time string addr port addr port string string interval count count string bool bool count string count count count count vector[string] string time
2009-11-18T08:00:21.486539 Pii6cUUq1v4 192.168.1.102 68 192.168.1.1 67 udp - 163.82ms 301 300 SF - - 0 Dd 1 329 1 328 (empty) - -
2009-11-18T08:08:00.237253 nkCxlvNN8pi 192.168.1.103 137 192.168.1.255 137 udp dns 3.78s 350 0 S0 - - 0 D 7 546 0 0 (empty) - -
2009-11-18T08:08:13.816224 9VdICMMnxQ7 192.168.1.102 137 192.168.1.255 137 udp dns 3.75s 350 0 S0 - - 0 D 7 546 0 0 (empty) - -
2009-11-18T08:07:15.800932 bEgBnkI31Vf 192.168.1.103 138 192.168.1.255 138 udp - 46.73s 560 0 S0 - - 0 D 3 644 0 0 (empty) - -
#separator \x09
#set_separator ,
#empty_field (empty)
#unset_field -
#path zeek.conn
#fields ts uid id.orig_h id.orig_p id.resp_h id.resp_p proto service duration orig_bytes resp_bytes conn_state local_orig local_resp missed_bytes history orig_pkts orig_ip_bytes resp_pkts resp_ip_bytes tunnel_parents community_id _write_ts
#types time string addr port addr port string string interval count count string bool bool count string count count count count vector[string] string time
2009-11-18T08:00:21.486539 Pii6cUUq1v4 192.168.1.102 68 192.168.1.1 67 udp - 163.82ms 301 300 SF - - 0 Dd 1 329 1 328 (empty) - -
2009-11-18T08:08:00.237253 nkCxlvNN8pi 192.168.1.103 137 192.168.1.255 137 udp dns 3.78s 350 0 S0 - - 0 D 7 546 0 0 (empty) - -
2009-11-18T08:08:13.816224 9VdICMMnxQ7 192.168.1.102 137 192.168.1.255 137 udp dns 3.75s 350 0 S0 - - 0 D 7 546 0 0 (empty) - -
2009-11-18T08:07:15.800932 bEgBnkI31Vf 192.168.1.103 138 192.168.1.255 138 udp - 46.73s 560 0 S0 - - 0 D 3 644 0 0 (empty) - -
Original file line number Diff line number Diff line change
@@ -1,2 +0,0 @@
error: failed to parse Zeek document: missing #path
= note: line 12
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
error: failed to parse Zeek document: missing #path
= note: line 14
error: failed to parse Zeek document: mismatching number #fields and #types
= note: found 20 #fields
= note: found 19 #types
= note: line 9
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
error: !! logic_error: 'write "zeek-tsv", anonymous(null, null, null, true)' does not support heterogeneous outputs; cannot initialize for 'zeek-tsv' after 'zeek.conn'
= note: from `write "zeek-tsv", anonymous(null, null, null, true)`
error: failed to parse Zeek document: missing #path
= note: line 12
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
error: failed to parse Zeek document: missing #path
= note: line 14
9 changes: 4 additions & 5 deletions tenzir/integration/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,10 @@ tests:
input: data/zeek/broken_no_separator_value.log
- command: exec 'from stdin read zeek-tsv | write zeek-tsv --disable-timestamp-tags'
input: data/zeek/broken_no_empty_and_unset_fields.log
- command: exec 'from stdin read zeek-tsv | write zeek-tsv --disable-timestamp-tags'
input: data/zeek/broken_no_closing_tag.log
- command: exec 'from stdin read zeek-tsv | write zeek-tsv --disable-timestamp-tags'
input: data/zeek/broken_no_data_after_open.log
- command: exec 'from stdin read zeek-tsv | write zeek-tsv --disable-timestamp-tags'
input: data/zeek/broken_unequal_fields_types_length.log
expected_result: error
Expand All @@ -823,11 +827,6 @@ tests:
- command: exec 'from stdin read zeek-tsv | write zeek-tsv --disable-timestamp-tags'
input: data/zeek/broken_data_after_close_tag.log
expected_result: error
- command: exec 'from stdin read zeek-tsv | write zeek-tsv --disable-timestamp-tags'
input: data/zeek/broken_no_closing_tag.log
expected_result: error
- command: exec 'from stdin read zeek-tsv | write zeek-tsv --disable-timestamp-tags'
input: data/zeek/broken_no_data_after_open.log

Zeek TSV with Remote Import:
fixture: ServerTester
Expand Down

0 comments on commit fc83f71

Please sign in to comment.