Skip to content

Commit

Permalink
ARROW-8678: [C++/Python][Parquet] Remove old writer code path
Browse files Browse the repository at this point in the history
The new code has been the default for 2 release.

Closes apache#8184 from emkornfield/ARROW-8678

Authored-by: Micah Kornfield <[email protected]>
Signed-off-by: Micah Kornfield <[email protected]>
  • Loading branch information
emkornfield committed Sep 17, 2020
1 parent ca12cd1 commit 7a532ed
Show file tree
Hide file tree
Showing 6 changed files with 5 additions and 372 deletions.
356 changes: 2 additions & 354 deletions cpp/src/parquet/arrow/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,249 +98,6 @@ bool HasNullableRoot(const SchemaManifest& schema_manifest,
return nullable;
}

class LevelBuilder {
public:
explicit LevelBuilder(MemoryPool* pool, const SchemaField* schema_field,
const SchemaManifest* schema_manifest)
: def_levels_(pool),
rep_levels_(pool),
schema_field_(schema_field),
schema_manifest_(schema_manifest) {}

Status VisitInline(const Array& array);

template <typename T>
::arrow::enable_if_t<std::is_base_of<::arrow::FlatArray, T>::value, Status> Visit(
const T& array) {
array_offsets_.push_back(static_cast<int32_t>(array.offset()));
valid_bitmaps_.push_back(array.null_bitmap_data());
null_counts_.push_back(array.null_count());
values_array_ = std::make_shared<T>(array.data());
return Status::OK();
}

Status Visit(const DictionaryArray& array) {
// Only currently handle DictionaryArray where the dictionary is a
// primitive type
if (array.dict_type()->value_type()->num_fields() > 0) {
return Status::NotImplemented(
"Writing DictionaryArray with nested dictionary "
"type not yet supported");
}
array_offsets_.push_back(static_cast<int32_t>(array.offset()));
valid_bitmaps_.push_back(array.null_bitmap_data());
null_counts_.push_back(array.null_count());
values_array_ = std::make_shared<DictionaryArray>(array.data());
return Status::OK();
}

Status Visit(const ListArray& array) {
array_offsets_.push_back(static_cast<int32_t>(array.offset()));
valid_bitmaps_.push_back(array.null_bitmap_data());
null_counts_.push_back(array.null_count());
offsets_.push_back(array.raw_value_offsets());

// Min offset isn't always zero in the case of sliced Arrays.
min_offset_idx_ = array.value_offset(min_offset_idx_);
max_offset_idx_ = array.value_offset(max_offset_idx_);

return VisitInline(*array.values());
}

Status Visit(const ExtensionArray& array) { return VisitInline(*array.storage()); }

#define NOT_IMPLEMENTED_VISIT(ArrowTypePrefix) \
Status Visit(const ::arrow::ArrowTypePrefix##Array& array) { \
return Status::NotImplemented("Level generation for " #ArrowTypePrefix \
" not supported yet"); \
}

// See ARROW-1644
NOT_IMPLEMENTED_VISIT(LargeList)
NOT_IMPLEMENTED_VISIT(Map)
NOT_IMPLEMENTED_VISIT(FixedSizeList)
NOT_IMPLEMENTED_VISIT(Struct)
NOT_IMPLEMENTED_VISIT(Union)

#undef NOT_IMPLEMENTED_VISIT

Status ExtractNullability() {
// Walk upwards to extract nullability
const SchemaField* current_field = schema_field_;
while (current_field != nullptr) {
nullable_.push_front(current_field->field->nullable());
if (current_field->field->type()->num_fields() > 1) {
return Status::NotImplemented(
"Fields with more than one child are not supported.");
} else {
current_field = schema_manifest_->GetParent(current_field);
}
}
return Status::OK();
}

Status GenerateLevels(const Array& array, int64_t* values_offset, int64_t* num_values,
int64_t* num_levels,
const std::shared_ptr<ResizableBuffer>& def_levels_scratch,
std::shared_ptr<Buffer>* def_levels_out,
std::shared_ptr<Buffer>* rep_levels_out,
std::shared_ptr<Array>* values_array) {
// Work downwards to extract bitmaps and offsets
min_offset_idx_ = 0;
max_offset_idx_ = array.length();
RETURN_NOT_OK(VisitInline(array));
*num_values = max_offset_idx_ - min_offset_idx_;
*values_offset = min_offset_idx_;
*values_array = values_array_;

RETURN_NOT_OK(ExtractNullability());

// Generate the levels.
if (nullable_.size() == 1) {
// We have a PrimitiveArray
*rep_levels_out = nullptr;
if (nullable_[0]) {
RETURN_NOT_OK(
def_levels_scratch->Resize(array.length() * sizeof(int16_t), false));
auto def_levels_ptr =
reinterpret_cast<int16_t*>(def_levels_scratch->mutable_data());
if (array.null_count() == 0) {
std::fill(def_levels_ptr, def_levels_ptr + array.length(), 1);
} else if (array.null_count() == array.length()) {
std::fill(def_levels_ptr, def_levels_ptr + array.length(), 0);
} else {
::arrow::internal::BitmapReader valid_bits_reader(
array.null_bitmap_data(), array.offset(), array.length());
for (int i = 0; i < array.length(); i++) {
def_levels_ptr[i] = valid_bits_reader.IsSet() ? 1 : 0;
valid_bits_reader.Next();
}
}

*def_levels_out = def_levels_scratch;
} else {
*def_levels_out = nullptr;
}
*num_levels = array.length();
} else {
// Note it is hard to estimate memory consumption due to zero length
// arrays otherwise we would preallocate. An upper boun on memory
// is the sum of the length of each list array + number of elements
// but this might be too loose of an upper bound so we choose to use
// safe methods.
RETURN_NOT_OK(rep_levels_.Append(0));
RETURN_NOT_OK(HandleListEntries(0, 0, 0, array.length()));

RETURN_NOT_OK(def_levels_.Finish(def_levels_out));
RETURN_NOT_OK(rep_levels_.Finish(rep_levels_out));
*num_levels = (*rep_levels_out)->size() / sizeof(int16_t);
}

return Status::OK();
}

Status HandleList(int16_t def_level, int16_t rep_level, int64_t index) {
if (nullable_[rep_level]) {
if (null_counts_[rep_level] == 0 ||
BitUtil::GetBit(valid_bitmaps_[rep_level], index + array_offsets_[rep_level])) {
return HandleNonNullList(static_cast<int16_t>(def_level + 1), rep_level, index);
} else {
return def_levels_.Append(def_level);
}
} else {
return HandleNonNullList(def_level, rep_level, index);
}
}

Status HandleNonNullList(int16_t def_level, int16_t rep_level, int64_t index) {
const int32_t inner_offset = offsets_[rep_level][index];
const int32_t inner_length = offsets_[rep_level][index + 1] - inner_offset;
const int64_t recursion_level = rep_level + 1;
if (inner_length == 0) {
return def_levels_.Append(def_level);
}
if (recursion_level < static_cast<int64_t>(offsets_.size())) {
return HandleListEntries(static_cast<int16_t>(def_level + 1),
static_cast<int16_t>(rep_level + 1), inner_offset,
inner_length);
}
// We have reached the leaf: primitive list, handle remaining nullables
const bool nullable_level = nullable_[recursion_level];
const int64_t level_null_count = null_counts_[recursion_level];
const uint8_t* level_valid_bitmap = valid_bitmaps_[recursion_level];

if (inner_length >= 1) {
RETURN_NOT_OK(
rep_levels_.Append(inner_length - 1, static_cast<int16_t>(rep_level + 1)));
}

// Special case: this is a null array (all elements are null)
if (level_null_count && level_valid_bitmap == nullptr) {
return def_levels_.Append(inner_length, static_cast<int16_t>(def_level + 1));
}
for (int64_t i = 0; i < inner_length; i++) {
if (nullable_level &&
((level_null_count == 0) ||
BitUtil::GetBit(level_valid_bitmap,
inner_offset + i + array_offsets_[recursion_level]))) {
// Non-null element in a null level
RETURN_NOT_OK(def_levels_.Append(static_cast<int16_t>(def_level + 2)));
} else {
// This can be produced in two cases:
// * elements are nullable and this one is null
// (i.e. max_def_level = def_level + 2)
// * elements are non-nullable (i.e. max_def_level = def_level + 1)
RETURN_NOT_OK(def_levels_.Append(static_cast<int16_t>(def_level + 1)));
}
}
return Status::OK();
}

Status HandleListEntries(int16_t def_level, int16_t rep_level, int64_t offset,
int64_t length) {
for (int64_t i = 0; i < length; i++) {
if (i > 0) {
RETURN_NOT_OK(rep_levels_.Append(rep_level));
}
RETURN_NOT_OK(HandleList(def_level, rep_level, offset + i));
}
return Status::OK();
}

private:
Int16BufferBuilder def_levels_;
Int16BufferBuilder rep_levels_;

const SchemaField* schema_field_;
const SchemaManifest* schema_manifest_;

std::vector<int64_t> null_counts_;
std::vector<const uint8_t*> valid_bitmaps_;
std::vector<const int32_t*> offsets_;
std::vector<int32_t> array_offsets_;
std::deque<bool> nullable_;

int64_t min_offset_idx_;
int64_t max_offset_idx_;
std::shared_ptr<Array> values_array_;
};

Status LevelBuilder::VisitInline(const Array& array) {
return VisitArrayInline(array, this);
}

Status GetLeafType(const ::arrow::DataType& type, ::arrow::Type::type* leaf_type) {
if (type.id() == ::arrow::Type::LIST || type.id() == ::arrow::Type::STRUCT) {
if (type.num_fields() != 1) {
return Status::Invalid("Nested column branch had multiple children: ", type);
}
return GetLeafType(*type.field(0)->type(), leaf_type);
} else {
*leaf_type = type.id();
return Status::OK();
}
}

// Manages writing nested parquet columns with support for all nested types
// supported by parquet.
class ArrowColumnWriterV2 {
Expand Down Expand Up @@ -488,104 +245,6 @@ class ArrowColumnWriterV2 {
RowGroupWriter* row_group_writer_;
};

class ArrowColumnWriter {
public:
ArrowColumnWriter(ArrowWriteContext* ctx, ColumnWriter* column_writer,
const SchemaField* schema_field,
const SchemaManifest* schema_manifest)
: ctx_(ctx),
writer_(column_writer),
schema_field_(schema_field),
schema_manifest_(schema_manifest) {}

Status Write(const Array& data) {
if (data.length() == 0) {
// Write nothing when length is 0
return Status::OK();
}

::arrow::Type::type values_type;
RETURN_NOT_OK(GetLeafType(*data.type(), &values_type));

std::shared_ptr<Array> _values_array;
int64_t values_offset = 0;
int64_t num_levels = 0;
int64_t num_values = 0;
LevelBuilder level_builder(ctx_->memory_pool, schema_field_, schema_manifest_);
std::shared_ptr<Buffer> def_levels_buffer, rep_levels_buffer;
RETURN_NOT_OK(level_builder.GenerateLevels(
data, &values_offset, &num_values, &num_levels, ctx_->def_levels_buffer,
&def_levels_buffer, &rep_levels_buffer, &_values_array));
const int16_t* def_levels = nullptr;
if (def_levels_buffer) {
def_levels = reinterpret_cast<const int16_t*>(def_levels_buffer->data());
}
const int16_t* rep_levels = nullptr;
if (rep_levels_buffer) {
rep_levels = reinterpret_cast<const int16_t*>(rep_levels_buffer->data());
}
std::shared_ptr<Array> values_array = _values_array->Slice(values_offset, num_values);
return writer_->WriteArrow(def_levels, rep_levels, num_levels, *values_array, ctx_);
}

Status Write(const ChunkedArray& data, int64_t offset, const int64_t size) {
if (data.length() == 0) {
return Status::OK();
}

int64_t absolute_position = 0;
int chunk_index = 0;
int64_t chunk_offset = 0;
while (chunk_index < data.num_chunks() && absolute_position < offset) {
const int64_t chunk_length = data.chunk(chunk_index)->length();
if (absolute_position + chunk_length > offset) {
// Relative offset into the chunk to reach the desired start offset for
// writing
chunk_offset = offset - absolute_position;
break;
} else {
++chunk_index;
absolute_position += chunk_length;
}
}

if (absolute_position >= data.length()) {
return Status::Invalid("Cannot write data at offset past end of chunked array");
}

int64_t values_written = 0;
while (values_written < size) {
const Array& chunk = *data.chunk(chunk_index);
const int64_t available_values = chunk.length() - chunk_offset;
const int64_t chunk_write_size = std::min(size - values_written, available_values);

// The chunk offset here will be 0 except for possibly the first chunk
// because of the advancing logic above
std::shared_ptr<Array> array_to_write = chunk.Slice(chunk_offset, chunk_write_size);
RETURN_NOT_OK(Write(*array_to_write));

if (chunk_write_size == available_values) {
chunk_offset = 0;
++chunk_index;
}
values_written += chunk_write_size;
}

return Status::OK();
}

Status Close() {
PARQUET_CATCH_NOT_OK(writer_->Close());
return Status::OK();
}

private:
ArrowWriteContext* ctx_;
ColumnWriter* writer_;
const SchemaField* schema_field_;
const SchemaManifest* schema_manifest_;
};

} // namespace

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -637,19 +296,8 @@ class FileWriterImpl : public FileWriter {

Status WriteColumnChunk(const std::shared_ptr<ChunkedArray>& data, int64_t offset,
int64_t size) override {
if (arrow_properties_->engine_version() == ArrowWriterProperties::V1) {
ColumnWriter* column_writer;
PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn());

const SchemaField* schema_field = nullptr;
RETURN_NOT_OK(schema_manifest_.GetColumnField(row_group_writer_->current_column(),
&schema_field));

ArrowColumnWriter arrow_writer(&column_write_context_, column_writer, schema_field,
&schema_manifest_);
RETURN_NOT_OK(arrow_writer.Write(*data, offset, size));
return arrow_writer.Close();
} else if (arrow_properties_->engine_version() == ArrowWriterProperties::V2) {
if (arrow_properties_->engine_version() == ArrowWriterProperties::V2 ||
arrow_properties_->engine_version() == ArrowWriterProperties::V1) {
ARROW_ASSIGN_OR_RAISE(
std::unique_ptr<ArrowColumnWriterV2> writer,
ArrowColumnWriterV2::Make(*data, offset, size, schema_manifest_,
Expand Down
2 changes: 0 additions & 2 deletions cpp/src/parquet/arrow/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ ::arrow::Status WriteMetaDataFile(const FileMetaData& file_metadata,
::arrow::io::OutputStream* sink);

/// \brief Write a Table to Parquet.
///
/// The table shall only consist of columns of primitive type or of primitive lists.
::arrow::Status PARQUET_EXPORT
WriteTable(const ::arrow::Table& table, MemoryPool* pool,
std::shared_ptr<::arrow::io::OutputStream> sink, int64_t chunk_size,
Expand Down
3 changes: 1 addition & 2 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1096,8 +1096,7 @@ cdef class ParquetFileFormat(FileFormat):
coerce_timestamps=write_options.get("coerce_timestamps", None),
allow_truncated_timestamps=write_options.get(
"allow_truncated_timestamps", False),
writer_engine_version=os.environ.get(
"ARROW_PARQUET_WRITER_ENGINE", "V2")
writer_engine_version="V2"
)
wrapped.get().writer_properties = properties
wrapped.get().arrow_writer_properties = arrow_properties
Expand Down
Loading

0 comments on commit 7a532ed

Please sign in to comment.