Skip to content

Commit

Permalink
ARROW-3951: [Go] implement a CSV writer
Browse files Browse the repository at this point in the history
@sbinet

Author: Anson Qian <[email protected]>

Closes apache#3755 from anson627/arrow-3951 and squashes the following commits:

df1735a <Anson Qian> Fix reader test
9bc8dc0 <Anson Qian> Fix unit test
6e63617 <Anson Qian> Fix typo
7624a97 <Anson Qian> Add example and bump up test coverage
f460e19 <Anson Qian> Add newline at end of file
947235c <Anson Qian> Consoliate option for reader and writer
2a57a67 <Anson Qian> Add memory size check
e00638e <Anson Qian> Address code reviews
92cbcea <Anson Qian> ARROW-3951  implement a CSV writer
  • Loading branch information
Anson Qian authored and sbinet committed Mar 11, 2019
1 parent 3db5797 commit e4ae2f6
Show file tree
Hide file tree
Showing 5 changed files with 448 additions and 96 deletions.
119 changes: 119 additions & 0 deletions go/arrow/csv/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package csv reads CSV files and presents the extracted data as records, also
// writes data as record into CSV files
package csv

import (
"errors"
"fmt"

"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/memory"
)

var (
ErrMismatchFields = errors.New("arrow/csv: number of records mismatch")
)

// Option configures a CSV reader/writer.
type Option func(config)
type config interface{}

// WithComma specifies the fields separation character used while parsing CSV files.
func WithComma(c rune) Option {
return func(cfg config) {
switch cfg := cfg.(type) {
case *Reader:
cfg.r.Comma = c
case *Writer:
cfg.w.Comma = c
default:
panic(fmt.Errorf("arrow/csv: unknown config type %T", cfg))
}
}
}

// WithComment specifies the comment character used while parsing CSV files.
func WithComment(c rune) Option {
return func(cfg config) {
switch cfg := cfg.(type) {
case *Reader:
cfg.r.Comment = c
default:
panic(fmt.Errorf("arrow/csv: unknown config type %T", cfg))
}
}
}

// WithAllocator specifies the Arrow memory allocator used while building records.
func WithAllocator(mem memory.Allocator) Option {
return func(cfg config) {
switch cfg := cfg.(type) {
case *Reader:
cfg.mem = mem
default:
panic(fmt.Errorf("arrow/csv: unknown config type %T", cfg))
}
}
}

// WithChunk specifies the chunk size used while parsing CSV files.
//
// If n is zero or 1, no chunking will take place and the reader will create
// one record per row.
// If n is greater than 1, chunks of n rows will be read.
// If n is negative, the reader will load the whole CSV file into memory and
// create one big record with all the rows.
func WithChunk(n int) Option {
return func(cfg config) {
switch cfg := cfg.(type) {
case *Reader:
cfg.chunk = n
default:
panic(fmt.Errorf("arrow/csv: unknown config type %T", cfg))
}
}
}

// WithCRLF specifies the line terminator used while writing CSV files.
// If useCRLF is true, \r\n is used as the line terminator, otherwise \n is used.
// The default value is false.
func WithCRLF(useCRLF bool) Option {
return func(cfg config) {
switch cfg := cfg.(type) {
case *Writer:
cfg.w.UseCRLF = useCRLF
default:
panic(fmt.Errorf("arrow/csv: unknown config type %T", cfg))
}
}
}

func validate(schema *arrow.Schema) {
for i, f := range schema.Fields() {
switch ft := f.Type.(type) {
case *arrow.BooleanType:
case *arrow.Int8Type, *arrow.Int16Type, *arrow.Int32Type, *arrow.Int64Type:
case *arrow.Uint8Type, *arrow.Uint16Type, *arrow.Uint32Type, *arrow.Uint64Type:
case *arrow.Float32Type, *arrow.Float64Type:
case *arrow.StringType:
default:
panic(fmt.Errorf("arrow/csv: field %d (%s) has invalid data type %T", i, f.Name, ft))
}
}
}
58 changes: 0 additions & 58 deletions go/arrow/csv/csv.go → go/arrow/csv/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package csv reads CSV files and presents the extracted data as records.
package csv

import (
"encoding/csv"
"errors"
"fmt"
"io"
"strconv"
"sync/atomic"
Expand All @@ -31,47 +28,6 @@ import (
"github.com/apache/arrow/go/arrow/memory"
)

var (
ErrMismatchFields = errors.New("arrow/csv: number of records mismatch")
)

// Option configures a CSV reader.
type Option func(*Reader)

// WithComment specifies the comment character used while parsing CSV files.
func WithComment(c rune) Option {
return func(r *Reader) {
r.r.Comment = c
}
}

// WithComma specifies the fields separation character used while parsing CSV files.
func WithComma(c rune) Option {
return func(r *Reader) {
r.r.Comma = c
}
}

// WithAllocator specifies the Arrow memory allocator used while building records.
func WithAllocator(mem memory.Allocator) Option {
return func(r *Reader) {
r.mem = mem
}
}

// WithChunk specifies the chunk size used while parsing CSV files.
//
// If n is zero or 1, no chunking will take place and the reader will create
// one record per row.
// If n is greater than 1, chunks of n rows will be read.
// If n is negative, the reader will load the whole CSV file into memory and
// create one big record with all the rows.
func WithChunk(n int) Option {
return func(r *Reader) {
r.chunk = n
}
}

// Reader wraps encoding/csv.Reader and creates array.Records from a schema.
type Reader struct {
r *csv.Reader
Expand Down Expand Up @@ -392,20 +348,6 @@ func (r *Reader) Release() {
}
}

func validate(schema *arrow.Schema) {
for i, f := range schema.Fields() {
switch ft := f.Type.(type) {
case *arrow.BooleanType:
case *arrow.Int8Type, *arrow.Int16Type, *arrow.Int32Type, *arrow.Int64Type:
case *arrow.Uint8Type, *arrow.Uint16Type, *arrow.Uint32Type, *arrow.Uint64Type:
case *arrow.Float32Type, *arrow.Float64Type:
case *arrow.StringType:
default:
panic(fmt.Errorf("arrow/csv: field %d (%s) has invalid data type %T", i, f.Name, ft))
}
}
}

var (
_ array.RecordReader = (*Reader)(nil)
)
76 changes: 38 additions & 38 deletions go/arrow/csv/csv_test.go → go/arrow/csv/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,42 +56,42 @@ func Example() {
for r.Next() {
rec := r.Record()
for i, col := range rec.Columns() {
fmt.Printf("rec[%d][%q]: %v\n", i, rec.ColumnName(i), col)
fmt.Printf("rec[%d][%q]: %v\n", n, rec.ColumnName(i), col)
}
n++
}

// Output:
// rec[0]["i64"]: [0]
// rec[1]["f64"]: [0]
// rec[2]["str"]: ["str-0"]
// rec[0]["i64"]: [1]
// rec[0]["f64"]: [0]
// rec[0]["str"]: ["str-0"]
// rec[1]["i64"]: [1]
// rec[1]["f64"]: [1]
// rec[2]["str"]: ["str-1"]
// rec[0]["i64"]: [2]
// rec[1]["f64"]: [2]
// rec[1]["str"]: ["str-1"]
// rec[2]["i64"]: [2]
// rec[2]["f64"]: [2]
// rec[2]["str"]: ["str-2"]
// rec[0]["i64"]: [3]
// rec[1]["f64"]: [3]
// rec[2]["str"]: ["str-3"]
// rec[0]["i64"]: [4]
// rec[1]["f64"]: [4]
// rec[2]["str"]: ["str-4"]
// rec[0]["i64"]: [5]
// rec[1]["f64"]: [5]
// rec[2]["str"]: ["str-5"]
// rec[0]["i64"]: [6]
// rec[1]["f64"]: [6]
// rec[2]["str"]: ["str-6"]
// rec[0]["i64"]: [7]
// rec[1]["f64"]: [7]
// rec[2]["str"]: ["str-7"]
// rec[0]["i64"]: [8]
// rec[1]["f64"]: [8]
// rec[2]["str"]: ["str-8"]
// rec[0]["i64"]: [9]
// rec[1]["f64"]: [9]
// rec[2]["str"]: ["str-9"]
// rec[3]["i64"]: [3]
// rec[3]["f64"]: [3]
// rec[3]["str"]: ["str-3"]
// rec[4]["i64"]: [4]
// rec[4]["f64"]: [4]
// rec[4]["str"]: ["str-4"]
// rec[5]["i64"]: [5]
// rec[5]["f64"]: [5]
// rec[5]["str"]: ["str-5"]
// rec[6]["i64"]: [6]
// rec[6]["f64"]: [6]
// rec[6]["str"]: ["str-6"]
// rec[7]["i64"]: [7]
// rec[7]["f64"]: [7]
// rec[7]["str"]: ["str-7"]
// rec[8]["i64"]: [8]
// rec[8]["f64"]: [8]
// rec[8]["str"]: ["str-8"]
// rec[9]["i64"]: [9]
// rec[9]["f64"]: [9]
// rec[9]["str"]: ["str-9"]
}

func Example_withChunk() {
Expand Down Expand Up @@ -127,24 +127,24 @@ func Example_withChunk() {
for r.Next() {
rec := r.Record()
for i, col := range rec.Columns() {
fmt.Printf("rec[%d][%q]: %v\n", i, rec.ColumnName(i), col)
fmt.Printf("rec[%d][%q]: %v\n", n, rec.ColumnName(i), col)
}
n++
}

// Output:
// rec[0]["i64"]: [0 1 2]
// rec[1]["f64"]: [0 1 2]
// rec[2]["str"]: ["str-0" "str-1" "str-2"]
// rec[0]["i64"]: [3 4 5]
// rec[0]["f64"]: [0 1 2]
// rec[0]["str"]: ["str-0" "str-1" "str-2"]
// rec[1]["i64"]: [3 4 5]
// rec[1]["f64"]: [3 4 5]
// rec[2]["str"]: ["str-3" "str-4" "str-5"]
// rec[0]["i64"]: [6 7 8]
// rec[1]["f64"]: [6 7 8]
// rec[1]["str"]: ["str-3" "str-4" "str-5"]
// rec[2]["i64"]: [6 7 8]
// rec[2]["f64"]: [6 7 8]
// rec[2]["str"]: ["str-6" "str-7" "str-8"]
// rec[0]["i64"]: [9]
// rec[1]["f64"]: [9]
// rec[2]["str"]: ["str-9"]
// rec[3]["i64"]: [9]
// rec[3]["f64"]: [9]
// rec[3]["str"]: ["str-9"]
}

func TestCSVReader(t *testing.T) {
Expand Down
Loading

0 comments on commit e4ae2f6

Please sign in to comment.