Skip to content

Commit

Permalink
ARROW-2315: [C++/Python] Flatten struct array
Browse files Browse the repository at this point in the history
Also adds tests for BitmapReader and BitmapWriter.

Author: Antoine Pitrou <[email protected]>

Closes apache#1755 from pitrou/ARROW-2315-struct-flatten and squashes the following commits:

c14d492 <Antoine Pitrou> Address review comments @cpcloud
f1ecb99 <Antoine Pitrou> Avoid C++14 binary literals
88b14c6 <Antoine Pitrou> Fix sliced flattening
cced334 <Antoine Pitrou> ARROW-1886:  Flatten struct array
  • Loading branch information
pitrou committed Apr 17, 2018
1 parent 72c7f5d commit 66d0ad1
Show file tree
Hide file tree
Showing 9 changed files with 324 additions and 0 deletions.
47 changes: 47 additions & 0 deletions cpp/src/arrow/array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,53 @@ std::shared_ptr<Array> StructArray::field(int i) const {
return boxed_fields_[i];
}

Status StructArray::Flatten(MemoryPool* pool, ArrayVector* out) const {
ArrayVector flattened;
std::shared_ptr<Buffer> null_bitmap = data_->buffers[0];

for (auto& child_data : data_->child_data) {
std::shared_ptr<Buffer> flattened_null_bitmap;
int64_t flattened_null_count = kUnknownNullCount;

// Need to adjust for parent offset
if (data_->offset != 0 || data_->length != child_data->length) {
child_data = SliceData(*child_data, data_->offset, data_->length);
}
std::shared_ptr<Buffer> child_null_bitmap = child_data->buffers[0];
const int64_t child_offset = child_data->offset;

// The validity of a flattened datum is the logical AND of the struct
// element's validity and the individual field element's validity.
if (null_bitmap && child_null_bitmap) {
RETURN_NOT_OK(BitmapAnd(pool, child_null_bitmap->data(), child_offset,
null_bitmap_data_, data_->offset, data_->length,
child_offset, &flattened_null_bitmap));
} else if (child_null_bitmap) {
flattened_null_bitmap = child_null_bitmap;
flattened_null_count = child_data->null_count;
} else if (null_bitmap) {
if (child_offset == data_->offset) {
flattened_null_bitmap = null_bitmap;
} else {
RETURN_NOT_OK(CopyBitmap(pool, null_bitmap_data_, data_->offset, data_->length,
&flattened_null_bitmap));
}
flattened_null_count = data_->null_count;
} else {
flattened_null_count = 0;
}

auto flattened_data = child_data->Copy();
flattened_data->buffers[0] = flattened_null_bitmap;
flattened_data->null_count = flattened_null_count;

flattened.push_back(MakeArray(flattened_data));
}

*out = flattened;
return Status::OK();
}

// ----------------------------------------------------------------------
// UnionArray

Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/array.h
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,12 @@ class ARROW_EXPORT StructArray : public Array {
// count adjusted.
std::shared_ptr<Array> field(int pos) const;

/// \brief Flatten this array as a vector of arrays, one for each field
///
/// \param[in] pool The pool to allocate null bitmaps from, if necessary
/// \param[out] out The resulting vector of arrays
Status Flatten(MemoryPool* pool, ArrayVector* out) const;

private:
// For caching boxed child data
mutable std::vector<std::shared_ptr<Array>> boxed_fields_;
Expand Down
144 changes: 144 additions & 0 deletions cpp/src/arrow/util/bit-util-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include <climits>
#include <cstdint>
#include <cstring>
#include <functional>
#include <initializer_list>
#include <limits>
#include <memory>
#include <vector>
Expand All @@ -41,6 +43,58 @@ static void EnsureCpuInfoInitialized() {
}
}

void WriteVectorToWriter(internal::BitmapWriter& writer, const std::vector<int> values) {
for (const auto& value : values) {
if (value) {
writer.Set();
} else {
writer.Clear();
}
writer.Next();
}
writer.Finish();
}

void BitmapFromVector(const std::vector<int>& values, int64_t bit_offset,
std::shared_ptr<Buffer>* out_buffer, int64_t* out_length) {
const int64_t length = values.size();
*out_length = length;
ASSERT_OK(GetEmptyBitmap(default_memory_pool(), length + bit_offset, out_buffer));
auto writer = internal::BitmapWriter((*out_buffer)->mutable_data(), bit_offset, length);
WriteVectorToWriter(writer, values);
}

#define ASSERT_READER_SET(reader) \
do { \
ASSERT_TRUE(reader.IsSet()); \
ASSERT_FALSE(reader.IsNotSet()); \
reader.Next(); \
} while (false)

#define ASSERT_READER_NOT_SET(reader) \
do { \
ASSERT_FALSE(reader.IsSet()); \
ASSERT_TRUE(reader.IsNotSet()); \
reader.Next(); \
} while (false)

// Assert that a BitmapReader yields the given bit values
void ASSERT_READER_VALUES(internal::BitmapReader& reader, std::vector<int> values) {
for (const auto& value : values) {
if (value) {
ASSERT_READER_SET(reader);
} else {
ASSERT_READER_NOT_SET(reader);
}
}
}

// Assert equal contents of a memory area and a vector of bytes
void ASSERT_BYTES_EQ(const uint8_t* left, const std::vector<uint8_t>& right) {
auto left_array = std::vector<uint8_t>(left, left + right.size());
ASSERT_EQ(std::vector<uint8_t>(std::begin(left_array), std::end(left_array)), right);
}

TEST(BitUtilTests, TestIsMultipleOf64) {
using BitUtil::IsMultipleOf64;
EXPECT_TRUE(IsMultipleOf64(64));
Expand Down Expand Up @@ -71,6 +125,20 @@ TEST(BitUtilTests, TestNextPower2) {
ASSERT_EQ(1LL << 62, NextPower2((1LL << 62) - 1));
}

TEST(BitmapReader, NormalOperation) {
std::shared_ptr<Buffer> buffer;
int64_t length;

for (int64_t offset : {0, 1, 3, 5, 7, 8, 12, 13, 21, 38, 75, 120}) {
BitmapFromVector({0, 1, 1, 1, 0, 0, 0, 1, 0, 1, 0, 1, 0, 1}, offset, &buffer,
&length);
ASSERT_EQ(length, 14);

auto reader = internal::BitmapReader(buffer->mutable_data(), offset, length);
ASSERT_READER_VALUES(reader, {0, 1, 1, 1, 0, 0, 0, 1, 0, 1, 0, 1, 0, 1});
}
}

TEST(BitmapReader, DoesNotReadOutOfBounds) {
uint8_t bitmap[16] = {0};

Expand All @@ -95,6 +163,37 @@ TEST(BitmapReader, DoesNotReadOutOfBounds) {
internal::BitmapReader r3(nullptr, 0, 0);
}

TEST(BitmapWriter, NormalOperation) {
{
uint8_t bitmap[] = {0, 0, 0, 0};
auto writer = internal::BitmapWriter(bitmap, 0, 12);
WriteVectorToWriter(writer, {0, 1, 1, 0, 1, 1, 0, 0, 0, 1, 0, 1});
// {0b00110110, 0b1010, 0, 0}
ASSERT_BYTES_EQ(bitmap, {0x36, 0x0a, 0, 0});
}
{
uint8_t bitmap[] = {0xff, 0xff, 0xff, 0xff};
auto writer = internal::BitmapWriter(bitmap, 0, 12);
WriteVectorToWriter(writer, {0, 1, 1, 0, 1, 1, 0, 0, 0, 1, 0, 1});
// {0b00110110, 0b11111010, 0xff, 0xff}
ASSERT_BYTES_EQ(bitmap, {0x36, 0xfa, 0xff, 0xff});
}
{
uint8_t bitmap[] = {0, 0, 0, 0};
auto writer = internal::BitmapWriter(bitmap, 3, 12);
WriteVectorToWriter(writer, {0, 1, 1, 0, 1, 1, 0, 0, 0, 1, 0, 1});
// {0b10110000, 0b01010001, 0, 0}
ASSERT_BYTES_EQ(bitmap, {0xb0, 0x51, 0, 0});
}
{
uint8_t bitmap[] = {0, 0, 0, 0};
auto writer = internal::BitmapWriter(bitmap, 20, 12);
WriteVectorToWriter(writer, {0, 1, 1, 0, 1, 1, 0, 0, 0, 1, 0, 1});
// {0, 0, 0b01100000, 0b10100011}
ASSERT_BYTES_EQ(bitmap, {0, 0, 0x60, 0xa3});
}
}

TEST(BitmapWriter, DoesNotWriteOutOfBounds) {
uint8_t bitmap[16] = {0};

Expand Down Expand Up @@ -128,6 +227,51 @@ TEST(BitmapWriter, DoesNotWriteOutOfBounds) {
ASSERT_EQ((length - 5), num_values);
}

TEST(BitmapAnd, Aligned) {
std::shared_ptr<Buffer> left, right, out;
int64_t length;

for (int64_t left_offset : {0, 1, 3, 5, 7, 8, 13, 21, 38, 75, 120}) {
BitmapFromVector({0, 1, 1, 1, 0, 0, 0, 1, 0, 1, 0, 1, 0, 1}, left_offset, &left,
&length);
for (int64_t right_offset : {left_offset, left_offset + 8, left_offset + 40}) {
BitmapFromVector({0, 0, 1, 0, 1, 1, 0, 0, 1, 1, 1, 0, 1, 0}, right_offset, &right,
&length);
for (int64_t out_offset : {left_offset, left_offset + 16, left_offset + 24}) {
ASSERT_OK(BitmapAnd(default_memory_pool(), left->mutable_data(), left_offset,
right->mutable_data(), right_offset, length, out_offset,
&out));
auto reader = internal::BitmapReader(out->mutable_data(), out_offset, length);
ASSERT_READER_VALUES(reader, {0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0});
}
}
}
}

TEST(BitmapAnd, Unaligned) {
std::shared_ptr<Buffer> left, right, out;
int64_t length;
auto offset_values = {0, 1, 3, 5, 7, 8, 13, 21, 38, 75, 120};

for (int64_t left_offset : offset_values) {
BitmapFromVector({0, 1, 1, 1, 0, 0, 0, 1, 0, 1, 0, 1, 0, 1}, left_offset, &left,
&length);

for (int64_t right_offset : offset_values) {
BitmapFromVector({0, 0, 1, 0, 1, 1, 0, 0, 1, 1, 1, 0, 1, 0}, right_offset, &right,
&length);

for (int64_t out_offset : offset_values) {
ASSERT_OK(BitmapAnd(default_memory_pool(), left->mutable_data(), left_offset,
right->mutable_data(), right_offset, length, out_offset,
&out));
auto reader = internal::BitmapReader(out->mutable_data(), out_offset, length);
ASSERT_READER_VALUES(reader, {0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0});
}
}
}
}

static inline int64_t SlowCountBits(const uint8_t* data, int64_t bit_offset,
int64_t length) {
int64_t count = 0;
Expand Down
54 changes: 54 additions & 0 deletions cpp/src/arrow/util/bit-util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,58 @@ bool BitmapEquals(const uint8_t* left, int64_t left_offset, const uint8_t* right
return true;
}

namespace {

void AlignedBitmapAnd(const uint8_t* left, int64_t left_offset, const uint8_t* right,
int64_t right_offset, uint8_t* out, int64_t out_offset,
int64_t length) {
DCHECK_EQ(left_offset % 8, right_offset % 8);
DCHECK_EQ(left_offset % 8, out_offset % 8);

const int64_t nbytes = BitUtil::BytesForBits(length + left_offset);
left += left_offset / 8;
right += right_offset / 8;
out += out_offset / 8;
for (int64_t i = 0; i < nbytes; ++i) {
out[i] = left[i] & right[i];
}
}

void UnalignedBitmapAnd(const uint8_t* left, int64_t left_offset, const uint8_t* right,
int64_t right_offset, uint8_t* out, int64_t out_offset,
int64_t length) {
auto left_reader = internal::BitmapReader(left, left_offset, length);
auto right_reader = internal::BitmapReader(right, right_offset, length);
auto writer = internal::BitmapWriter(out, out_offset, length);
for (int64_t i = 0; i < length; ++i) {
if (left_reader.IsSet() && right_reader.IsSet()) {
writer.Set();
}
left_reader.Next();
right_reader.Next();
writer.Next();
}
writer.Finish();
}

} // namespace

Status BitmapAnd(MemoryPool* pool, const uint8_t* left, int64_t left_offset,
const uint8_t* right, int64_t right_offset, int64_t length,
int64_t out_offset, std::shared_ptr<Buffer>* out_buffer) {
if ((out_offset % 8 == left_offset % 8) && (out_offset % 8 == right_offset % 8)) {
// Fast case: can use bytewise AND
const int64_t phys_bits = length + out_offset;
RETURN_NOT_OK(GetEmptyBitmap(pool, phys_bits, out_buffer));
AlignedBitmapAnd(left, left_offset, right, right_offset,
(*out_buffer)->mutable_data(), out_offset, length);
} else {
// Unaligned
RETURN_NOT_OK(GetEmptyBitmap(pool, length + out_offset, out_buffer));
UnalignedBitmapAnd(left, left_offset, right, right_offset,
(*out_buffer)->mutable_data(), out_offset, length);
}
return Status::OK();
}

} // namespace arrow
5 changes: 5 additions & 0 deletions cpp/src/arrow/util/bit-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,11 @@ ARROW_EXPORT
bool BitmapEquals(const uint8_t* left, int64_t left_offset, const uint8_t* right,
int64_t right_offset, int64_t bit_length);

ARROW_EXPORT
Status BitmapAnd(MemoryPool* pool, const uint8_t* left, int64_t left_offset,
const uint8_t* right, int64_t right_offset, int64_t length,
int64_t out_offset, std::shared_ptr<Buffer>* out_buffer);

} // namespace arrow

#endif // ARROW_UTIL_BIT_UTIL_H
24 changes: 24 additions & 0 deletions python/pyarrow/array.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,30 @@ cdef class DictionaryArray(Array):

cdef class StructArray(Array):

def flatten(self, MemoryPool memory_pool=None):
"""
Flatten this StructArray, returning one individual array for each
field in the struct.
Parameters
----------
memory_pool : MemoryPool, default None
For memory allocations, if required, otherwise use default pool
Returns
-------
result : List[Array]
"""
cdef:
vector[shared_ptr[CArray]] arrays
CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
CStructArray* sarr = <CStructArray*> self.ap

with nogil:
check_status(sarr.Flatten(pool, &arrays))

return [pyarrow_wrap_array(arr) for arr in arrays]

@staticmethod
def from_arrays(arrays, names=None):
"""
Expand Down
2 changes: 2 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,8 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:

shared_ptr[CArray] field(int pos)

CStatus Flatten(CMemoryPool* pool, vector[shared_ptr[CArray]]* out)

CStatus ValidateArray(const CArray& array)

cdef cppclass CChunkedArray" arrow::ChunkedArray":
Expand Down
4 changes: 4 additions & 0 deletions python/pyarrow/lib.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ cdef class Decimal128Array(FixedSizeBinaryArray):
pass


cdef class StructArray(Array):
pass


cdef class ListArray(Array):
pass

Expand Down
Loading

0 comments on commit 66d0ad1

Please sign in to comment.