Skip to content

Commit

Permalink
Implement interfaces with support for Arrow
Browse files Browse the repository at this point in the history
Implementing interfaces with support for Arrow where
the inspiration of the implementation came from CSV.

As part of VMware project we needed the capability
to write parquet files using arrow schemas with go.
And since the xitongsys/parquet-go project is pretty
advanced we went with it and extended the already existing
interfaces of the project ourselves and decided to give
this to the open source community for people to reuse it
where they see fit.

The change includes:
* Adding arrow schema handler along with tests
* Adding arrow writer along with tests
* Adding arrow marshaller tested through writer
* Adding example with the full arrow process
* Extending the README to mention that Arrow is now supported
and how it can be used through the example

Signed-off-by: Martin Dekov <[email protected]>
  • Loading branch information
martindekov committed Oct 14, 2021
1 parent 055d06d commit e5b55d6
Show file tree
Hide file tree
Showing 10 changed files with 1,132 additions and 21 deletions.
24 changes: 22 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ Using this interface, parquet-go can read/write parquet file on different platfo

## Writer

Three Writers are supported: ParquetWriter, JSONWriter, CSVWriter.
Three Writers are supported: ParquetWriter, JSONWriter, CSVWriter, ArrowWriter.

* ParquetWriter is used to write predefined Golang structs.
[Example of ParquetWriter](https://github.com/xitongsys/parquet-go/blob/master/example/local_flat.go)
Expand All @@ -204,6 +204,9 @@ Three Writers are supported: ParquetWriter, JSONWriter, CSVWriter.
* CSVWriter is used to write data format similar with CSV(not nested)
[Example of CSVWriter](https://github.com/xitongsys/parquet-go/blob/master/example/csv_write.go)

* ArrowWriter is used to write parquet files using Arrow Schemas
[Example of ArrowWriter](https://github.com/xitongsys/parquet-go/blob/master/example/arrow_to_parquet.go)

## Reader

Two Readers are supported: ParquetReader, ColumnReader
Expand All @@ -226,7 +229,7 @@ Two Readers are supported: ParquetReader, ColumnReader

## Schema

There are three methods to define the schema: go struct tags, Json, CSV metadata. Only items in schema will be written and others will be ignored.
There are three methods to define the schema: go struct tags, Json, CSV, Arrow metadata. Only items in schema will be written and others will be ignored.

### Tag

Expand Down Expand Up @@ -332,6 +335,21 @@ var jsonSchema string = `

[Example of CSV metadata](https://github.com/xitongsys/parquet-go/blob/master/example/csv_write.go)

### Arrow metadata

```golang
schema := arrow.NewSchema(
[]arrow.Field{
{Name: "int64", Type: arrow.PrimitiveTypes.Int64},
{Name: "float64", Type: arrow.PrimitiveTypes.Float64},
{Name: "str", Type: arrow.BinaryTypes.String},
},
nil,
)
```

[Example of Arrow metadata](https://github.com/xitongsys/parquet-go/blob/master/example/arrow_to_parquet.go)

### Tips

* Parquet-go reads data as an object in Golang and every field must be a public field, which start with an upper letter. This field name we call it `InName`. Field name in parquet file we call it `ExName`. Function `common.HeadToUpper` converts `ExName` to `InName`. There are some restriction:
Expand All @@ -348,6 +366,7 @@ func NewParquetReader(pFile ParquetFile.ParquetFile, obj interface{}, np int64)
func NewParquetWriter(pFile ParquetFile.ParquetFile, obj interface{}, np int64) (*ParquetWriter, error)
func NewJSONWriter(jsonSchema string, pfile ParquetFile.ParquetFile, np int64) (*JSONWriter, error)
func NewCSVWriter(md []string, pfile ParquetFile.ParquetFile, np int64) (*CSVWriter, error)
func NewArrowWriter(arrowSchema *arrow.Schema, pfile source.ParquetFile, np int64) (*ArrowWriter error)
```

## Examples
Expand All @@ -370,6 +389,7 @@ func NewCSVWriter(md []string, pfile ParquetFile.ParquetFile, np int64) (*CSVWri
|[writer.go](https://github.com/xitongsys/parquet-go/blob/master/example/writer.go)|create ParquetWriter from io.Writer|
|[keyvalue_metadata.go](https://github.com/xitongsys/parquet-go/blob/master/example/keyvalue_metadata.go)|write keyvalue metadata|
|[dot_in_name.go](https://github.com/xitongsys/parquet-go/blob/master/example/dot_in_name.go)|`.` in filed name|
|[arrow_to_parquet.go](https://github.com/xitongsys/parquet-go/blob/master/example/arrow_to_parquet.go)|write/read parquet file using arrow definition|



Expand Down
298 changes: 298 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (
"strconv"
"strings"

"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/array"
"github.com/xitongsys/parquet-go/parquet"
"github.com/xitongsys/parquet-go/types"
)

// `parquet:"name=Name, type=FIXED_LEN_BYTE_ARRAY, length=12"`
Expand Down Expand Up @@ -989,3 +992,298 @@ func StrToPath(str string) []string {
func PathStrIndex(str string) int {
return len(strings.Split(str, PAR_GO_PATH_DELIMITER))
}

// NewTable creates empty table with transposed columns and records
func NewTable(rowLen, colLen int) [][]interface{} {
tableLen := make([]interface{}, rowLen*colLen)
// Need to reconsinder to avoid allocation and memcopy.
newTable := make([][]interface{}, rowLen)
lo, hi := 0, colLen
for i := range newTable {
newTable[i] = tableLen[lo:hi:hi]
lo, hi = hi, hi+colLen
}
return newTable
}

// TransposeTable transposes a table's rows and columns once per arrow record.
// We need to transpose the rows and columns because parquet-go library writes
// data row by row while the arrow library provides the data column by column.
func TransposeTable(table [][]interface{}) [][]interface{} {
transposedTable := NewTable(len(table[0]), len(table))
for i := 0; i < len(transposedTable); i++ {
row := transposedTable[i]
for j := 0; j < len(row); j++ {
row[j] = table[j][i]
}
}
return transposedTable
}

// ArrowColToParquetCol creates column with native parquet values from column
// with arrow values.
//
// If a single record is not valid by the arrow definitions we assign it
// default value which we chose.
func ArrowColToParquetCol(field arrow.Field, col array.Interface, len int,
el *parquet.SchemaElement) []interface{} {
recs := make([]interface{}, len)
switch field.Type.(type) {
case *arrow.Int8Type:
arr := col.(*array.Int8)
var rec int8
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
rec = arr.Value(i)
} else {
rec = 0
}
recs[i] = types.StrToParquetType(fmt.Sprintf("%v", rec),
el.Type,
el.ConvertedType,
int(el.GetTypeLength()),
int(el.GetScale()))
}
case *arrow.Int16Type:
arr := col.(*array.Int16)
var rec int16
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
rec = arr.Value(i)
} else {
rec = 0
}
recs[i] = types.StrToParquetType(fmt.Sprintf("%v", rec),
el.Type,
el.ConvertedType,
int(el.GetTypeLength()),
int(el.GetScale()))
}
case *arrow.Int32Type:
arr := col.(*array.Int32)
var rec int32
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
rec = arr.Value(i)
} else {
rec = 0
}
recs[i] = types.StrToParquetType(fmt.Sprintf("%v", rec),
el.Type,
el.ConvertedType,
int(el.GetTypeLength()),
int(el.GetScale()))
}
case *arrow.Int64Type:
arr := col.(*array.Int64)
var rec int64
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
rec = arr.Value(i)
} else {
rec = 0
}
recs[i] = types.StrToParquetType(fmt.Sprintf("%v", rec),
el.Type,
el.ConvertedType,
int(el.GetTypeLength()),
int(el.GetScale()))
}
case *arrow.Uint8Type:
arr := col.(*array.Uint8)
var rec uint8
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
rec = arr.Value(i)
} else {
rec = 0
}
recs[i] = types.StrToParquetType(fmt.Sprintf("%v", rec),
el.Type,
el.ConvertedType,
int(el.GetTypeLength()),
int(el.GetScale()))
}
case *arrow.Uint16Type:
arr := col.(*array.Uint16)
var rec uint16
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
rec = arr.Value(i)
} else {
rec = 0
}
recs[i] = types.StrToParquetType(fmt.Sprintf("%v", rec),
el.Type,
el.ConvertedType,
int(el.GetTypeLength()),
int(el.GetScale()))
}
case *arrow.Uint32Type:
arr := col.(*array.Uint32)
var rec int32
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
rec = int32(arr.Value(i))
} else {
rec = 0
}
recs[i] = types.StrToParquetType(fmt.Sprintf("%v", rec),
el.Type,
el.ConvertedType,
int(el.GetTypeLength()),
int(el.GetScale()))
}
case *arrow.Uint64Type:
arr := col.(*array.Uint64)
var rec int64
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
rec = int64(arr.Value(i))
} else {
rec = 0
}
recs[i] = types.StrToParquetType(fmt.Sprintf("%v", rec),
el.Type,
el.ConvertedType,
int(el.GetTypeLength()),
int(el.GetScale()))
}
case *arrow.Float32Type:
arr := col.(*array.Float32)
var rec float32
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
rec = arr.Value(i)
} else {
rec = 0
}
recs[i] = types.StrToParquetType(fmt.Sprintf("%v", rec),
el.Type,
el.ConvertedType,
int(el.GetTypeLength()),
int(el.GetScale()))
}
case *arrow.Float64Type:
arr := col.(*array.Float64)
var rec float64
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
rec = arr.Value(i)
} else {
rec = 0
}
recs[i] = types.StrToParquetType(fmt.Sprintf("%v", rec),
el.Type,
el.ConvertedType,
int(el.GetTypeLength()),
int(el.GetScale()))
}
case *arrow.Date32Type:
arr := col.(*array.Date32)
var rec arrow.Date32
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
rec = arr.Value(i)
} else {
rec = 0
}
recs[i] = types.StrToParquetType(fmt.Sprintf("%v", rec),
el.Type,
el.ConvertedType,
int(el.GetTypeLength()),
int(el.GetScale()))
}
case *arrow.Date64Type:
arr := col.(*array.Date64)
var rec arrow.Date64
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
rec = arr.Value(i)
} else {
rec = 0
}
recs[i] = types.StrToParquetType(fmt.Sprintf("%v", rec),
el.Type,
el.ConvertedType,
int(el.GetTypeLength()),
int(el.GetScale()))
}
case *arrow.BinaryType:
arr := col.(*array.Binary)
var rec []byte
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
rec = arr.Value(i)
} else {
rec = []byte("")
}
recs[i] = types.StrToParquetType(fmt.Sprintf("%v", rec),
el.Type,
el.ConvertedType,
int(el.GetTypeLength()),
int(el.GetScale()))
}
case *arrow.StringType:
arr := col.(*array.String)
var rec string
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
rec = arr.Value(i)
} else {
rec = ""
}
recs[i] = types.StrToParquetType(fmt.Sprintf("%v", rec),
el.Type,
el.ConvertedType,
int(el.GetTypeLength()),
int(el.GetScale()))
}
case *arrow.BooleanType:
arr := col.(*array.Boolean)
var rec bool
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
rec = arr.Value(i)
} else {
rec = false
}
recs[i] = types.StrToParquetType(fmt.Sprintf("%v", rec),
el.Type,
el.ConvertedType,
int(el.GetTypeLength()),
int(el.GetScale()))
}
case *arrow.Time32Type:
arr := col.(*array.Time32)
var rec arrow.Time32
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
rec = arr.Value(i)
} else {
rec = 0
}
recs[i] = types.StrToParquetType(fmt.Sprintf("%v", rec),
el.Type,
el.ConvertedType,
int(el.GetTypeLength()),
int(el.GetScale()))
}
case *arrow.TimestampType:
arr := col.(*array.Timestamp)
var rec arrow.Timestamp
for i := 0; i < arr.Len(); i++ {
if arr.IsValid(i) {
rec = arr.Value(i)
} else {
rec = 0
}
recs[i] = types.StrToParquetType(fmt.Sprintf("%v", rec),
el.Type,
el.ConvertedType,
int(el.GetTypeLength()),
int(el.GetScale()))
}
}
return recs
}
Loading

0 comments on commit e5b55d6

Please sign in to comment.