Skip to content

Commit

Permalink
ARROW-632: [Python] Add support for FixedWidthBinary type
Browse files Browse the repository at this point in the history
Author: Phillip Cloud <[email protected]>

Closes apache#461 from cpcloud/ARROW-632 and squashes the following commits:

134644a [Phillip Cloud] ARROW-632: [Python] Add support for FixedWidthBinary type
  • Loading branch information
cpcloud authored and wesm committed Mar 30, 2017
1 parent edd6cfc commit 4915ecf
Show file tree
Hide file tree
Showing 20 changed files with 367 additions and 55 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,6 @@
*.dylib
.build_cache_dir
MANIFEST

cpp/.idea/
python/.eggs/
1 change: 1 addition & 0 deletions cpp/src/arrow/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ Status MakeBuilder(MemoryPool* pool, const std::shared_ptr<DataType>& type,
BUILDER_CASE(DOUBLE, DoubleBuilder);
BUILDER_CASE(STRING, StringBuilder);
BUILDER_CASE(BINARY, BinaryBuilder);
BUILDER_CASE(FIXED_WIDTH_BINARY, FixedWidthBinaryBuilder);
case Type::LIST: {
std::shared_ptr<ArrayBuilder> value_builder;
std::shared_ptr<DataType> value_type =
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/ipc/ipc-read-write-benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ static void BM_WriteRecordBatch(benchmark::State& state) { // NOLINT non-const
int32_t metadata_length;
int64_t body_length;
if (!ipc::WriteRecordBatch(*record_batch, 0, &stream, &metadata_length, &body_length,
default_memory_pool())
default_memory_pool())
.ok()) {
state.SkipWithError("Failed to write!");
}
Expand All @@ -101,7 +101,7 @@ static void BM_ReadRecordBatch(benchmark::State& state) { // NOLINT non-const r
int32_t metadata_length;
int64_t body_length;
if (!ipc::WriteRecordBatch(*record_batch, 0, &stream, &metadata_length, &body_length,
default_memory_pool())
default_memory_pool())
.ok()) {
state.SkipWithError("Failed to write!");
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
#include "arrow/ipc/util.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/type.h"
#include "arrow/tensor.h"
#include "arrow/type.h"
#include "arrow/util/logging.h"

namespace arrow {
Expand Down
101 changes: 84 additions & 17 deletions cpp/src/arrow/python/builtin_convert.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "arrow/api.h"
#include "arrow/status.h"
#include "arrow/util/logging.h"

#include "arrow/python/helpers.h"
#include "arrow/python/util/datetime.h"
Expand Down Expand Up @@ -200,18 +201,25 @@ class SeqVisitor {
int nesting_histogram_[MAX_NESTING_LEVELS];
};

// Non-exhaustive type inference
Status InferArrowType(PyObject* obj, int64_t* size, std::shared_ptr<DataType>* out_type) {
*size = PySequence_Size(obj);
Status InferArrowSize(PyObject* obj, int64_t* size) {
*size = static_cast<int64_t>(PySequence_Size(obj));
if (PyErr_Occurred()) {
// Not a sequence
PyErr_Clear();
return Status::TypeError("Object is not a sequence");
}
return Status::OK();
}

// Non-exhaustive type inference
Status InferArrowTypeAndSize(
PyObject* obj, int64_t* size, std::shared_ptr<DataType>* out_type) {
RETURN_NOT_OK(InferArrowSize(obj, size));

// For 0-length sequences, refuse to guess
if (*size == 0) { *out_type = null(); }

PyDateTime_IMPORT;
SeqVisitor seq_visitor;
RETURN_NOT_OK(seq_visitor.Visit(obj));
RETURN_NOT_OK(seq_visitor.Validate());
Expand Down Expand Up @@ -253,7 +261,7 @@ class TypedConverter : public SeqConverter {
class BoolConverter : public TypedConverter<BooleanBuilder> {
public:
Status AppendData(PyObject* seq) override {
Py_ssize_t size = PySequence_Size(seq);
int64_t size = static_cast<int64_t>(PySequence_Size(seq));
RETURN_NOT_OK(typed_builder_->Reserve(size));
for (int64_t i = 0; i < size; ++i) {
OwnedRef item(PySequence_GetItem(seq, i));
Expand All @@ -275,14 +283,14 @@ class Int64Converter : public TypedConverter<Int64Builder> {
public:
Status AppendData(PyObject* seq) override {
int64_t val;
Py_ssize_t size = PySequence_Size(seq);
int64_t size = static_cast<int64_t>(PySequence_Size(seq));
RETURN_NOT_OK(typed_builder_->Reserve(size));
for (int64_t i = 0; i < size; ++i) {
OwnedRef item(PySequence_GetItem(seq, i));
if (item.obj() == Py_None) {
typed_builder_->AppendNull();
} else {
val = PyLong_AsLongLong(item.obj());
val = static_cast<int64_t>(PyLong_AsLongLong(item.obj()));
RETURN_IF_PYERROR();
typed_builder_->Append(val);
}
Expand All @@ -294,7 +302,7 @@ class Int64Converter : public TypedConverter<Int64Builder> {
class DateConverter : public TypedConverter<Date64Builder> {
public:
Status AppendData(PyObject* seq) override {
Py_ssize_t size = PySequence_Size(seq);
int64_t size = static_cast<int64_t>(PySequence_Size(seq));
RETURN_NOT_OK(typed_builder_->Reserve(size));
for (int64_t i = 0; i < size; ++i) {
OwnedRef item(PySequence_GetItem(seq, i));
Expand All @@ -312,7 +320,7 @@ class DateConverter : public TypedConverter<Date64Builder> {
class TimestampConverter : public TypedConverter<TimestampBuilder> {
public:
Status AppendData(PyObject* seq) override {
Py_ssize_t size = PySequence_Size(seq);
int64_t size = static_cast<int64_t>(PySequence_Size(seq));
RETURN_NOT_OK(typed_builder_->Reserve(size));
for (int64_t i = 0; i < size; ++i) {
OwnedRef item(PySequence_GetItem(seq, i));
Expand All @@ -334,7 +342,8 @@ class TimestampConverter : public TypedConverter<TimestampBuilder> {
epoch.tm_year = 70;
epoch.tm_mday = 1;
// Microseconds since the epoch
int64_t val = lrint(difftime(mktime(&datetime), mktime(&epoch))) * 1000000 + us;
int64_t val = static_cast<int64_t>(
lrint(difftime(mktime(&datetime), mktime(&epoch))) * 1000000 + us);
typed_builder_->Append(val);
}
}
Expand All @@ -346,7 +355,7 @@ class DoubleConverter : public TypedConverter<DoubleBuilder> {
public:
Status AppendData(PyObject* seq) override {
double val;
Py_ssize_t size = PySequence_Size(seq);
int64_t size = static_cast<int64_t>(PySequence_Size(seq));
RETURN_NOT_OK(typed_builder_->Reserve(size));
for (int64_t i = 0; i < size; ++i) {
OwnedRef item(PySequence_GetItem(seq, i));
Expand All @@ -369,7 +378,7 @@ class BytesConverter : public TypedConverter<BinaryBuilder> {
PyObject* bytes_obj;
OwnedRef tmp;
const char* bytes;
int64_t length;
Py_ssize_t length;
Py_ssize_t size = PySequence_Size(seq);
for (int64_t i = 0; i < size; ++i) {
item = PySequence_GetItem(seq, i);
Expand All @@ -385,7 +394,8 @@ class BytesConverter : public TypedConverter<BinaryBuilder> {
} else if (PyBytes_Check(item)) {
bytes_obj = item;
} else {
return Status::TypeError("Non-string value encountered");
return Status::TypeError(
"Value that cannot be converted to bytes was encountered");
}
// No error checking
length = PyBytes_GET_SIZE(bytes_obj);
Expand All @@ -396,14 +406,49 @@ class BytesConverter : public TypedConverter<BinaryBuilder> {
}
};

class FixedWidthBytesConverter : public TypedConverter<FixedWidthBinaryBuilder> {
public:
Status AppendData(PyObject* seq) override {
PyObject* item;
PyObject* bytes_obj;
OwnedRef tmp;
Py_ssize_t expected_length = std::dynamic_pointer_cast<FixedWidthBinaryType>(
typed_builder_->type())->byte_width();
Py_ssize_t size = PySequence_Size(seq);
for (int64_t i = 0; i < size; ++i) {
item = PySequence_GetItem(seq, i);
OwnedRef holder(item);

if (item == Py_None) {
RETURN_NOT_OK(typed_builder_->AppendNull());
continue;
} else if (PyUnicode_Check(item)) {
tmp.reset(PyUnicode_AsUTF8String(item));
RETURN_IF_PYERROR();
bytes_obj = tmp.obj();
} else if (PyBytes_Check(item)) {
bytes_obj = item;
} else {
return Status::TypeError(
"Value that cannot be converted to bytes was encountered");
}
// No error checking
RETURN_NOT_OK(CheckPythonBytesAreFixedLength(bytes_obj, expected_length));
RETURN_NOT_OK(typed_builder_->Append(
reinterpret_cast<const uint8_t*>(PyBytes_AS_STRING(bytes_obj))));
}
return Status::OK();
}
};

class UTF8Converter : public TypedConverter<StringBuilder> {
public:
Status AppendData(PyObject* seq) override {
PyObject* item;
PyObject* bytes_obj;
OwnedRef tmp;
const char* bytes;
int64_t length;
Py_ssize_t length;
Py_ssize_t size = PySequence_Size(seq);
for (int64_t i = 0; i < size; ++i) {
item = PySequence_GetItem(seq, i);
Expand Down Expand Up @@ -465,14 +510,15 @@ std::shared_ptr<SeqConverter> GetConverter(const std::shared_ptr<DataType>& type
return std::make_shared<DoubleConverter>();
case Type::BINARY:
return std::make_shared<BytesConverter>();
case Type::FIXED_WIDTH_BINARY:
return std::make_shared<FixedWidthBytesConverter>();
case Type::STRING:
return std::make_shared<UTF8Converter>();
case Type::LIST:
return std::make_shared<ListConverter>();
case Type::STRUCT:
default:
return nullptr;
break;
}
}

Expand All @@ -492,6 +538,7 @@ Status ListConverter::Init(const std::shared_ptr<ArrayBuilder>& builder) {

Status AppendPySequence(PyObject* obj, const std::shared_ptr<DataType>& type,
const std::shared_ptr<ArrayBuilder>& builder) {
PyDateTime_IMPORT;
std::shared_ptr<SeqConverter> converter = GetConverter(type);
if (converter == nullptr) {
std::stringstream ss;
Expand All @@ -506,9 +553,12 @@ Status AppendPySequence(PyObject* obj, const std::shared_ptr<DataType>& type,
Status ConvertPySequence(PyObject* obj, MemoryPool* pool, std::shared_ptr<Array>* out) {
std::shared_ptr<DataType> type;
int64_t size;
PyDateTime_IMPORT;
RETURN_NOT_OK(InferArrowType(obj, &size, &type));
RETURN_NOT_OK(InferArrowTypeAndSize(obj, &size, &type));
return ConvertPySequence(obj, pool, out, type, size);
}

Status ConvertPySequence(PyObject* obj, MemoryPool* pool, std::shared_ptr<Array>* out,
const std::shared_ptr<DataType>& type, int64_t size) {
// Handle NA / NullType case
if (type->type == Type::NA) {
out->reset(new NullArray(size));
Expand All @@ -519,9 +569,26 @@ Status ConvertPySequence(PyObject* obj, MemoryPool* pool, std::shared_ptr<Array>
std::shared_ptr<ArrayBuilder> builder;
RETURN_NOT_OK(MakeBuilder(pool, type, &builder));
RETURN_NOT_OK(AppendPySequence(obj, type, builder));

return builder->Finish(out);
}

Status ConvertPySequence(PyObject* obj, MemoryPool* pool, std::shared_ptr<Array>* out,
const std::shared_ptr<DataType>& type) {
int64_t size;
RETURN_NOT_OK(InferArrowSize(obj, &size));
return ConvertPySequence(obj, pool, out, type, size);
}

Status CheckPythonBytesAreFixedLength(PyObject* obj, Py_ssize_t expected_length) {
const Py_ssize_t length = PyBytes_GET_SIZE(obj);
if (length != expected_length) {
std::stringstream ss;
ss << "Found byte string of length " << length << ", expected length is "
<< expected_length;
return Status::TypeError(ss.str());
}
return Status::OK();
}

} // namespace py
} // namespace arrow
17 changes: 16 additions & 1 deletion cpp/src/arrow/python/builtin_convert.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,31 @@ class Status;

namespace py {

ARROW_EXPORT arrow::Status InferArrowType(
ARROW_EXPORT arrow::Status InferArrowTypeAndSize(
PyObject* obj, int64_t* size, std::shared_ptr<arrow::DataType>* out_type);
ARROW_EXPORT arrow::Status InferArrowSize(PyObject* obj, int64_t* size);

ARROW_EXPORT arrow::Status AppendPySequence(PyObject* obj,
const std::shared_ptr<arrow::DataType>& type,
const std::shared_ptr<arrow::ArrayBuilder>& builder);

// Type and size inference
ARROW_EXPORT
Status ConvertPySequence(PyObject* obj, MemoryPool* pool, std::shared_ptr<Array>* out);

// Size inference
ARROW_EXPORT
Status ConvertPySequence(PyObject* obj, MemoryPool* pool, std::shared_ptr<Array>* out,
const std::shared_ptr<DataType>& type);

// No inference
ARROW_EXPORT
Status ConvertPySequence(PyObject* obj, MemoryPool* pool, std::shared_ptr<Array>* out,
const std::shared_ptr<DataType>& type, int64_t size);

ARROW_EXPORT Status CheckPythonBytesAreFixedLength(
PyObject* obj, Py_ssize_t expected_length);

} // namespace py
} // namespace arrow

Expand Down
Loading

0 comments on commit 4915ecf

Please sign in to comment.