Skip to content

Commit

Permalink
[fix](Outfile) Fix the data type mapping for complex types in Doris t…
Browse files Browse the repository at this point in the history
…o the ORC and Parquet file formats. (apache#44041)

### What problem does this PR solve?

Problem Summary:

As before, the behavior of exporting of complex data types in Doris is
as follows:
  | orc type | parquet type | csv
-- | -- | -- | --
bitmap | string | Not Supported | Not Supported
quantile_state | Not Supported | Not Supported | Not Supported
hll | string | string | invisible string
jsonb | Not Supported | string | string
variant | Not Supported | string | string

What's more, there are some issues when exporting complex data types to
the ORC file format.

This PR does two things:
1. Fix the problem with exporting complex data types from Doris.
2. Support exporting these three complex types to both the ORC and the
Parquet file format.


  | orc type | parquet type | csv
-- | -- | -- | --
bitmap | binary | binary | "NULL"
quantile_state | binary | binary | "NULL"
hll | binary | binary | "NULL"
jsonb | string | string | string
variant | string | string | string

### Release note

[fix](Outfile) Fix the data type mapping for complex types in Doris to the ORC and Parquet file formats.
  • Loading branch information
BePPPower authored Nov 28, 2024
1 parent 839208c commit 5641c2d
Show file tree
Hide file tree
Showing 17 changed files with 501 additions and 132 deletions.
9 changes: 7 additions & 2 deletions be/src/util/arrow/row_batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <arrow/result.h>
#include <arrow/status.h>
#include <arrow/type.h>
#include <arrow/type_fwd.h>
#include <glog/logging.h>
#include <stdint.h>

Expand Down Expand Up @@ -84,12 +85,10 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::
case TYPE_LARGEINT:
case TYPE_VARCHAR:
case TYPE_CHAR:
case TYPE_HLL:
case TYPE_DATE:
case TYPE_DATETIME:
case TYPE_STRING:
case TYPE_JSONB:
case TYPE_OBJECT:
*result = arrow::utf8();
break;
case TYPE_DATEV2:
Expand Down Expand Up @@ -150,6 +149,12 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr<arrow::
*result = arrow::utf8();
break;
}
case TYPE_QUANTILE_STATE:
case TYPE_OBJECT:
case TYPE_HLL: {
*result = arrow::binary();
break;
}
default:
return Status::InvalidArgument("Unknown primitive type({}) convert to Arrow type",
type.type);
Expand Down
59 changes: 56 additions & 3 deletions be/src/vec/data_types/serde/data_type_bitmap_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "data_type_bitmap_serde.h"

#include <arrow/array/builder_binary.h>
#include <gen_cpp/types.pb.h>

#include <string>
Expand All @@ -27,13 +28,37 @@
#include "vec/columns/column_const.h"
#include "vec/common/arena.h"
#include "vec/common/assert_cast.h"
#include "vec/data_types/serde/data_type_nullable_serde.h"

namespace doris {

namespace vectorized {
class IColumn;
#include "common/compile_check_begin.h"

Status DataTypeBitMapSerDe::serialize_column_to_json(const IColumn& column, int64_t start_idx,
int64_t end_idx, BufferWritable& bw,
FormatOptions& options) const {
SERIALIZE_COLUMN_TO_JSON();
}

Status DataTypeBitMapSerDe::serialize_one_cell_to_json(const IColumn& column, int64_t row_num,
BufferWritable& bw,
FormatOptions& options) const {
/**
* For null values in ordinary types, we use \N to represent them;
* for null values in nested types, we use null to represent them, just like the json format.
*/
if (_nesting_level >= 2) {
bw.write(DataTypeNullableSerDe::NULL_IN_COMPLEX_TYPE.c_str(),
strlen(NULL_IN_COMPLEX_TYPE.c_str()));
} else {
bw.write(DataTypeNullableSerDe::NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str(),
strlen(NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str()));
}
return Status::OK();
}

Status DataTypeBitMapSerDe::deserialize_column_from_json_vector(
IColumn& column, std::vector<Slice>& slices, int* num_deserialized,
const FormatOptions& options) const {
Expand Down Expand Up @@ -96,6 +121,26 @@ void DataTypeBitMapSerDe::write_one_cell_to_jsonb(const IColumn& column, JsonbWr
result.writeEndBinary();
}

void DataTypeBitMapSerDe::write_column_to_arrow(const IColumn& column, const NullMap* null_map,
arrow::ArrayBuilder* array_builder, int64_t start,
int64_t end, const cctz::time_zone& ctz) const {
const auto& col = assert_cast<const ColumnBitmap&>(column);
auto& builder = assert_cast<arrow::BinaryBuilder&>(*array_builder);
for (size_t string_i = start; string_i < end; ++string_i) {
if (null_map && (*null_map)[string_i]) {
checkArrowStatus(builder.AppendNull(), column.get_name(),
array_builder->type()->name());
} else {
auto& bitmap_value = const_cast<BitmapValue&>(col.get_element(string_i));
std::string memory_buffer(bitmap_value.getSizeInBytes(), '0');
bitmap_value.write_to(memory_buffer.data());
checkArrowStatus(
builder.Append(memory_buffer.data(), static_cast<int>(memory_buffer.size())),
column.get_name(), array_builder->type()->name());
}
}
}

void DataTypeBitMapSerDe::read_one_cell_from_jsonb(IColumn& column, const JsonbValue* arg) const {
auto& col = reinterpret_cast<ColumnBitmap&>(column);
auto blob = static_cast<const JsonbBlobVal*>(arg);
Expand Down Expand Up @@ -148,11 +193,19 @@ Status DataTypeBitMapSerDe::write_column_to_orc(const std::string& timezone, con
auto& col_data = assert_cast<const ColumnBitmap&>(column);
orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch);

INIT_MEMORY_FOR_ORC_WRITER()

for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 1) {
const auto& ele = col_data.get_data_at(row_id);
cur_batch->data[row_id] = const_cast<char*>(ele.data);
cur_batch->length[row_id] = ele.size;
auto bitmap_value = const_cast<BitmapValue&>(col_data.get_element(row_id));
size_t len = bitmap_value.getSizeInBytes();

REALLOC_MEMORY_FOR_ORC_WRITER()

bitmap_value.write_to(const_cast<char*>(bufferRef.data) + offset);
cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset;
cur_batch->length[row_id] = len;
offset += len;
}
}

Expand Down
13 changes: 3 additions & 10 deletions be/src/vec/data_types/serde/data_type_bitmap_serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,10 @@ class DataTypeBitMapSerDe : public DataTypeSerDe {
DataTypeBitMapSerDe(int nesting_level = 1) : DataTypeSerDe(nesting_level) {};

Status serialize_one_cell_to_json(const IColumn& column, int64_t row_num, BufferWritable& bw,
FormatOptions& options) const override {
return Status::NotSupported("serialize_one_cell_to_json with type [{}]", column.get_name());
}
FormatOptions& options) const override;

Status serialize_column_to_json(const IColumn& column, int64_t start_idx, int64_t end_idx,
BufferWritable& bw, FormatOptions& options) const override {
return Status::NotSupported("serialize_column_to_json with type [{}]", column.get_name());
}
BufferWritable& bw, FormatOptions& options) const override;

Status deserialize_one_cell_from_json(IColumn& column, Slice& slice,
const FormatOptions& options) const override;
Expand All @@ -63,10 +59,7 @@ class DataTypeBitMapSerDe : public DataTypeSerDe {

void write_column_to_arrow(const IColumn& column, const NullMap* null_map,
arrow::ArrayBuilder* array_builder, int64_t start, int64_t end,
const cctz::time_zone& ctz) const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"write_column_to_arrow with type " + column.get_name());
}
const cctz::time_zone& ctz) const override;

void read_column_from_arrow(IColumn& column, const arrow::Array* arrow_array, int start,
int end, const cctz::time_zone& ctz) const override {
Expand Down
20 changes: 2 additions & 18 deletions be/src/vec/data_types/serde/data_type_date64_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,16 +289,7 @@ Status DataTypeDate64SerDe::write_column_to_orc(const std::string& timezone, con
auto& col_data = static_cast<const ColumnVector<Int64>&>(column).get_data();
orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch);

char* ptr = (char*)malloc(BUFFER_UNIT_SIZE);
if (!ptr) {
return Status::InternalError(
"malloc memory error when write largeint column data to orc file.");
}
StringRef bufferRef;
bufferRef.data = ptr;
bufferRef.size = BUFFER_UNIT_SIZE;
size_t offset = 0;
const size_t begin_off = offset;
INIT_MEMORY_FOR_ORC_WRITER()

for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 0) {
Expand All @@ -310,18 +301,11 @@ Status DataTypeDate64SerDe::write_column_to_orc(const std::string& timezone, con

REALLOC_MEMORY_FOR_ORC_WRITER()

cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset;
cur_batch->length[row_id] = len;
offset += len;
}
size_t data_off = 0;
for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 1) {
cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + begin_off + data_off;
data_off += cur_batch->length[row_id];
}
}

buffer_list.emplace_back(bufferRef);
cur_batch->numElements = end - start;
return Status::OK();
}
Expand Down
48 changes: 23 additions & 25 deletions be/src/vec/data_types/serde/data_type_hll_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <stddef.h>
#include <stdint.h>

#include <memory>
#include <string>

#include "arrow/array/builder_binary.h"
Expand Down Expand Up @@ -48,28 +49,17 @@ Status DataTypeHLLSerDe::serialize_column_to_json(const IColumn& column, int64_t
Status DataTypeHLLSerDe::serialize_one_cell_to_json(const IColumn& column, int64_t row_num,
BufferWritable& bw,
FormatOptions& options) const {
if (!options._output_object_data) {
/**
* For null values in ordinary types, we use \N to represent them;
* for null values in nested types, we use null to represent them, just like the json format.
*/
if (_nesting_level >= 2) {
bw.write(DataTypeNullableSerDe::NULL_IN_COMPLEX_TYPE.c_str(),
strlen(NULL_IN_COMPLEX_TYPE.c_str()));
} else {
bw.write(DataTypeNullableSerDe::NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str(),
strlen(NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str()));
}
return Status::OK();
/**
* For null values in ordinary types, we use \N to represent them;
* for null values in nested types, we use null to represent them, just like the json format.
*/
if (_nesting_level >= 2) {
bw.write(DataTypeNullableSerDe::NULL_IN_COMPLEX_TYPE.c_str(),
strlen(NULL_IN_COMPLEX_TYPE.c_str()));
} else {
bw.write(DataTypeNullableSerDe::NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str(),
strlen(NULL_IN_CSV_FOR_ORDINARY_TYPE.c_str()));
}
auto col_row = check_column_const_set_readability(column, row_num);
ColumnPtr ptr = col_row.first;
row_num = col_row.second;
auto& data = const_cast<HyperLogLog&>(assert_cast<const ColumnHLL&>(*ptr).get_element(row_num));
std::unique_ptr<char[]> buf =
std::make_unique_for_overwrite<char[]>(data.max_serialized_size());
size_t size = data.serialize((uint8*)buf.get());
bw.write(buf.get(), size);
return Status::OK();
}

Expand Down Expand Up @@ -139,7 +129,7 @@ void DataTypeHLLSerDe::write_column_to_arrow(const IColumn& column, const NullMa
arrow::ArrayBuilder* array_builder, int64_t start,
int64_t end, const cctz::time_zone& ctz) const {
const auto& col = assert_cast<const ColumnHLL&>(column);
auto& builder = assert_cast<arrow::StringBuilder&>(*array_builder);
auto& builder = assert_cast<arrow::BinaryBuilder&>(*array_builder);
for (size_t string_i = start; string_i < end; ++string_i) {
if (null_map && (*null_map)[string_i]) {
checkArrowStatus(builder.AppendNull(), column.get_name(),
Expand Down Expand Up @@ -198,11 +188,19 @@ Status DataTypeHLLSerDe::write_column_to_orc(const std::string& timezone, const
auto& col_data = assert_cast<const ColumnHLL&>(column);
orc::StringVectorBatch* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch);

INIT_MEMORY_FOR_ORC_WRITER()

for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 1) {
const auto& ele = col_data.get_data_at(row_id);
cur_batch->data[row_id] = const_cast<char*>(ele.data);
cur_batch->length[row_id] = ele.size;
auto hll_value = const_cast<HyperLogLog&>(col_data.get_element(row_id));
size_t len = hll_value.max_serialized_size();

REALLOC_MEMORY_FOR_ORC_WRITER()

hll_value.serialize((uint8_t*)(bufferRef.data) + offset);
cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset;
cur_batch->length[row_id] = len;
offset += len;
}
}

Expand Down
37 changes: 11 additions & 26 deletions be/src/vec/data_types/serde/data_type_ipv6_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,38 +187,23 @@ Status DataTypeIPv6SerDe::write_column_to_orc(const std::string& timezone, const
std::vector<StringRef>& buffer_list) const {
const auto& col_data = assert_cast<const ColumnIPv6&>(column).get_data();
orc::StringVectorBatch* cur_batch = assert_cast<orc::StringVectorBatch*>(orc_col_batch);
char* ptr = (char*)malloc(BUFFER_UNIT_SIZE);
if (!ptr) {
return Status::InternalError(
"malloc memory error when write largeint column data to orc file.");
}
StringRef bufferRef;
bufferRef.data = ptr;
bufferRef.size = BUFFER_UNIT_SIZE;
size_t offset = 0;
const size_t begin_off = offset;

for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 0) {
continue;
}
std::string ipv6_str = IPv6Value::to_string(col_data[row_id]);
size_t len = ipv6_str.size();

REALLOC_MEMORY_FOR_ORC_WRITER()
INIT_MEMORY_FOR_ORC_WRITER()

strcpy(const_cast<char*>(bufferRef.data) + offset, ipv6_str.c_str());
offset += len;
cur_batch->length[row_id] = len;
}
size_t data_off = 0;
for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 1) {
cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + begin_off + data_off;
data_off += cur_batch->length[row_id];
std::string ipv6_str = IPv6Value::to_string(col_data[row_id]);
size_t len = ipv6_str.size();

REALLOC_MEMORY_FOR_ORC_WRITER()

strcpy(const_cast<char*>(bufferRef.data) + offset, ipv6_str.c_str());
cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset;
cur_batch->length[row_id] = len;
offset += len;
}
}
buffer_list.emplace_back(bufferRef);

cur_batch->numElements = end - start;
return Status::OK();
}
Expand Down
25 changes: 24 additions & 1 deletion be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include <cstddef>
#include <cstdint>
#include <memory>

#include "arrow/array/builder_binary.h"
#include "common/exception.h"
Expand Down Expand Up @@ -142,7 +143,29 @@ Status DataTypeJsonbSerDe::write_column_to_orc(const std::string& timezone, cons
orc::ColumnVectorBatch* orc_col_batch, int64_t start,
int64_t end,
std::vector<StringRef>& buffer_list) const {
return Status::NotSupported("write_column_to_orc with type [{}]", column.get_name());
auto* cur_batch = dynamic_cast<orc::StringVectorBatch*>(orc_col_batch);
const auto& string_column = assert_cast<const ColumnString&>(column);

INIT_MEMORY_FOR_ORC_WRITER()

for (size_t row_id = start; row_id < end; row_id++) {
if (cur_batch->notNull[row_id] == 1) {
std::string_view string_ref = string_column.get_data_at(row_id).to_string_view();
auto serialized_value = std::make_unique<std::string>(
JsonbToJson::jsonb_to_json_string(string_ref.data(), string_ref.size()));
auto len = serialized_value->size();

REALLOC_MEMORY_FOR_ORC_WRITER()

memcpy(const_cast<char*>(bufferRef.data) + offset, serialized_value->data(), len);
cur_batch->data[row_id] = const_cast<char*>(bufferRef.data) + offset;
cur_batch->length[row_id] = len;
offset += len;
}
}

cur_batch->numElements = end - start;
return Status::OK();
}

void convert_jsonb_to_rapidjson(const JsonbValue& val, rapidjson::Value& target,
Expand Down
Loading

0 comments on commit 5641c2d

Please sign in to comment.