Skip to content

Commit

Permalink
ARROW-18106: [C++] JSON reader ignores explicit schema with default u…
Browse files Browse the repository at this point in the history
…nexpected_field_behavior="infer" (apache#14741)

See: [ARROW-18106](https://issues.apache.org/jira/browse/ARROW-18106)

The current `ChunkedArrayBuilder` always uses the promotion graph for type conversions if it's provided (even for fields in the explicit schema). The bug in question occurs because conversion errors aren't propagated in `InferringChunkedArrayBuilder`. Instead, it will attempt to promote the type on failure, which is undesirable in cases like this.

This PR doesn't change any of that. It only restricts type inference to unexpected fields while fields present in the schema remain "strongly-typed" - so conversion errors for expected fields will still be reported if `UnexpectedFieldBehavior::InferType` is specified.

Authored-by: benibus <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
benibus authored Dec 14, 2022
1 parent 6a3c8b2 commit 0dfec8e
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 9 deletions.
40 changes: 36 additions & 4 deletions cpp/src/arrow/json/chunked_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ using internal::checked_cast;
using internal::TaskGroup;

namespace json {
namespace {

Status MakeChunkedArrayBuilder(const std::shared_ptr<TaskGroup>& task_group,
MemoryPool* pool, const PromotionGraph* promotion_graph,
const std::shared_ptr<DataType>& type,
bool allow_promotion,
std::shared_ptr<ChunkedArrayBuilder>* out);

class NonNestedChunkedArrayBuilder : public ChunkedArrayBuilder {
public:
Expand Down Expand Up @@ -404,7 +411,7 @@ class ChunkedStructArrayBuilder : public ChunkedArrayBuilder {

std::shared_ptr<ChunkedArrayBuilder> child_builder;
RETURN_NOT_OK(MakeChunkedArrayBuilder(task_group_, pool_, promotion_graph_, type,
&child_builder));
/*allow_promotion=*/true, &child_builder));
child_builders_.emplace_back(std::move(child_builder));
}

Expand Down Expand Up @@ -432,14 +439,23 @@ class ChunkedStructArrayBuilder : public ChunkedArrayBuilder {
Status MakeChunkedArrayBuilder(const std::shared_ptr<TaskGroup>& task_group,
MemoryPool* pool, const PromotionGraph* promotion_graph,
const std::shared_ptr<DataType>& type,
bool allow_promotion,
std::shared_ptr<ChunkedArrayBuilder>* out) {
// If a promotion graph is provided, unexpected fields will be allowed - using the graph
// recursively for itself and any child fields (via the `allow_promotion` parameter).
// Fields provided in the schema will adhere to their corresponding type. However,
// structs defined in the schema may obtain unexpected child fields, which will use the
// promotion graph as well.
//
// If a promotion graph is not provided, unexpected fields are always ignored and
// type inference never occurs.
if (type->id() == Type::STRUCT) {
std::vector<std::pair<std::string, std::shared_ptr<ChunkedArrayBuilder>>>
child_builders;
for (const auto& f : type->fields()) {
std::shared_ptr<ChunkedArrayBuilder> child_builder;
RETURN_NOT_OK(MakeChunkedArrayBuilder(task_group, pool, promotion_graph, f->type(),
&child_builder));
allow_promotion, &child_builder));
child_builders.emplace_back(f->name(), std::move(child_builder));
}
*out = std::make_shared<ChunkedStructArrayBuilder>(task_group, pool, promotion_graph,
Expand All @@ -450,14 +466,17 @@ Status MakeChunkedArrayBuilder(const std::shared_ptr<TaskGroup>& task_group,
const auto& list_type = checked_cast<const ListType&>(*type);
std::shared_ptr<ChunkedArrayBuilder> value_builder;
RETURN_NOT_OK(MakeChunkedArrayBuilder(task_group, pool, promotion_graph,
list_type.value_type(), &value_builder));
list_type.value_type(), allow_promotion,
&value_builder));
*out = std::make_shared<ChunkedListArrayBuilder>(
task_group, pool, std::move(value_builder), list_type.value_field());
return Status::OK();
}

// Construct the "leaf" builder
std::shared_ptr<Converter> converter;
RETURN_NOT_OK(MakeConverter(type, pool, &converter));
if (promotion_graph) {
if (allow_promotion && promotion_graph) {
*out = std::make_shared<InferringChunkedArrayBuilder>(task_group, promotion_graph,
std::move(converter));
} else {
Expand All @@ -466,5 +485,18 @@ Status MakeChunkedArrayBuilder(const std::shared_ptr<TaskGroup>& task_group,
return Status::OK();
}

} // namespace

// This overload is exposed to the user and will only be called once on instantiation to
// canonicalize any explicitly-defined fields. Such fields won't be subject to
// type inference/promotion
Status MakeChunkedArrayBuilder(const std::shared_ptr<TaskGroup>& task_group,
MemoryPool* pool, const PromotionGraph* promotion_graph,
const std::shared_ptr<DataType>& type,
std::shared_ptr<ChunkedArrayBuilder>* out) {
return MakeChunkedArrayBuilder(task_group, pool, promotion_graph, type,
/*allow_promotion=*/false, out);
}

} // namespace json
} // namespace arrow
3 changes: 1 addition & 2 deletions cpp/src/arrow/json/converter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ namespace json {

template <typename... Args>
Status GenericConversionError(const DataType& type, Args&&... args) {
return Status::Invalid("Failed of conversion of JSON to ", type,
std::forward<Args>(args)...);
return Status::Invalid("Failed to convert JSON to ", type, std::forward<Args>(args)...);
}

namespace {
Expand Down
5 changes: 2 additions & 3 deletions cpp/src/arrow/json/converter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,7 @@ TEST(ConverterTest, Decimal128And256ScaleError) {
std::shared_ptr<StructArray> parse_array;
ASSERT_OK(ParseFromString(options, json_source, &parse_array));

std::string error_msg = "Failed of conversion of JSON to " +
decimal_type->ToString() +
std::string error_msg = "Failed to convert JSON to " + decimal_type->ToString() +
": 30.0123456789001 requires scale 13";
EXPECT_RAISES_WITH_MESSAGE_THAT(
Invalid, ::testing::HasSubstr(error_msg),
Expand All @@ -256,7 +255,7 @@ TEST(ConverterTest, Decimal128And256PrecisionError) {
ASSERT_OK(ParseFromString(options, json_source, &parse_array));

std::string error_msg =
"Invalid: Failed of conversion of JSON to " + decimal_type->ToString() +
"Invalid: Failed to convert JSON to " + decimal_type->ToString() +
": 123456789012345678901234567890.0123456789 requires precision 40";
EXPECT_RAISES_WITH_MESSAGE_THAT(
Invalid, ::testing::HasSubstr(error_msg),
Expand Down
109 changes: 109 additions & 0 deletions cpp/src/arrow/json/reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ using std::string_view;

using internal::checked_cast;

static Result<std::shared_ptr<Table>> ReadToTable(std::string json,
const ReadOptions& read_options,
const ParseOptions& parse_options) {
std::shared_ptr<io::InputStream> input;
RETURN_NOT_OK(MakeStream(json, &input));
ARROW_ASSIGN_OR_RAISE(auto reader, TableReader::Make(default_memory_pool(), input,
read_options, parse_options));
return reader->Read();
}

class ReaderTest : public ::testing::TestWithParam<bool> {
public:
void SetUpReader() {
Expand Down Expand Up @@ -325,6 +335,105 @@ TEST(ReaderTest, FailOnInvalidEOF) {
}
}

// ARROW-18106
TEST(ReaderTest, FailOnTimeUnitMismatch) {
std::string json = R"({"t":"2022-09-05T08:08:46.000"})";

auto read_options = ReadOptions::Defaults();
read_options.use_threads = false;
auto parse_options = ParseOptions::Defaults();
parse_options.explicit_schema = schema({field("t", timestamp(TimeUnit::SECOND))});

std::shared_ptr<io::InputStream> input;
std::shared_ptr<TableReader> reader;
for (auto behavior : {UnexpectedFieldBehavior::Error, UnexpectedFieldBehavior::Ignore,
UnexpectedFieldBehavior::InferType}) {
parse_options.unexpected_field_behavior = behavior;
EXPECT_RAISES_WITH_MESSAGE_THAT(
Invalid, ::testing::StartsWith("Invalid: Failed to convert JSON to timestamp[s]"),
ReadToTable(json, read_options, parse_options));
}
}

TEST(ReaderTest, InferNestedFieldsWithSchema) {
std::string json = R"({}
{"a": {"c": null}}
{"a": {"c": {}}}
{"a": {"c": {"d": null}}}
{"a": {"c": {"d": []}}}
{"a": {"c": {"d": [null]}}}
{"a": {"c": {"d": [{}]}}}
{"a": {"c": {"d": [{"e": null}]}}}
{"a": {"c": {"d": [{"e": true}]}}}
)";

auto read_options = ReadOptions::Defaults();
read_options.use_threads = false;
auto parse_options = ParseOptions::Defaults();
parse_options.explicit_schema =
schema({field("a", struct_({field("b", timestamp(TimeUnit::SECOND))}))});
parse_options.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;

auto expected_schema = schema({field(
"a", struct_({field("b", timestamp(TimeUnit::SECOND)),
field("c", struct_({field(
"d", list(struct_({field("e", boolean())})))}))}))});
auto expected_batch = RecordBatchFromJSON(expected_schema, R"([
{"a": null},
{"a": {"b": null, "c": null}},
{"a": {"b": null, "c": {"d": null}}},
{"a": {"b": null, "c": {"d": null}}},
{"a": {"b": null, "c": {"d": []}}},
{"a": {"b": null, "c": {"d": [null]}}},
{"a": {"b": null, "c": {"d": [{"e": null}]}}},
{"a": {"b": null, "c": {"d": [{"e": null}]}}},
{"a": {"b": null, "c": {"d": [{"e": true}]}}}
])");
ASSERT_OK_AND_ASSIGN(auto expected_table, Table::FromRecordBatches({expected_batch}));

ASSERT_OK_AND_ASSIGN(auto table, ReadToTable(json, read_options, parse_options));
AssertTablesEqual(*expected_table, *table);

json += std::string(R"({"a": {"b": "2022-09-05T08:08:46.000"}})") + "\n";
EXPECT_RAISES_WITH_MESSAGE_THAT(
Invalid, ::testing::StartsWith("Invalid: Failed to convert JSON to timestamp[s]"),
ReadToTable(json, read_options, parse_options));
}

TEST(ReaderTest, InferNestedFieldsInListWithSchema) {
std::string json = R"({}
{"a": [{"b": "2022-09-05T08:08:00"}]}
{"a": [{"b": "2022-09-05T08:08:01", "c": null}]}
{"a": [{"b": "2022-09-05T08:08:02", "c": {"d": true}}]}
)";

auto read_options = ReadOptions::Defaults();
read_options.use_threads = false;
auto parse_options = ParseOptions::Defaults();
parse_options.explicit_schema =
schema({field("a", list(struct_({field("b", timestamp(TimeUnit::SECOND))})))});
parse_options.unexpected_field_behavior = UnexpectedFieldBehavior::InferType;

auto expected_schema =
schema({field("a", list(struct_({field("b", timestamp(TimeUnit::SECOND)),
field("c", struct_({field("d", boolean())}))})))});
auto expected_batch = RecordBatchFromJSON(expected_schema, R"([
{"a": null},
{"a": [{"b": "2022-09-05T08:08:00", "c": null}]},
{"a": [{"b": "2022-09-05T08:08:01", "c": null}]},
{"a": [{"b": "2022-09-05T08:08:02", "c": {"d": true}}]}
])");
ASSERT_OK_AND_ASSIGN(auto expected_table, Table::FromRecordBatches({expected_batch}));

ASSERT_OK_AND_ASSIGN(auto table, ReadToTable(json, read_options, parse_options));
AssertTablesEqual(*expected_table, *table);

json += std::string(R"({"a": [{"b": "2022-09-05T08:08:03.000", "c": {}}]})") + "\n";
EXPECT_RAISES_WITH_MESSAGE_THAT(
Invalid, ::testing::StartsWith("Invalid: Failed to convert JSON to timestamp[s]"),
ReadToTable(json, read_options, parse_options));
}

class StreamingReaderTestBase {
public:
virtual ~StreamingReaderTestBase() = default;
Expand Down

0 comments on commit 0dfec8e

Please sign in to comment.