Skip to content

Commit

Permalink
ARROW-5468: [Go] implement read/write IPC for Timestamp arrays
Browse files Browse the repository at this point in the history
Needs apache#4437

Author: Sebastien Binet <[email protected]>

Closes apache#4438 from sbinet/issue-5468 and squashes the following commits:

f2197f5 <Sebastien Binet> ARROW-5468:  implement read/write IPC for Timestamp arrays
  • Loading branch information
sbinet committed Jun 3, 2019
1 parent 8800da1 commit c273a24
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 25 deletions.
26 changes: 14 additions & 12 deletions go/arrow/datatype_fixedwidth.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,21 @@ func (t *Time64Type) String() string { return "time64[" + t.Unit.String() + "]"

var (
FixedWidthTypes = struct {
Boolean FixedWidthDataType
Float16 FixedWidthDataType
Time32s FixedWidthDataType
Time32ms FixedWidthDataType
Time64us FixedWidthDataType
Time64ns FixedWidthDataType
Boolean FixedWidthDataType
Float16 FixedWidthDataType
Time32s FixedWidthDataType
Time32ms FixedWidthDataType
Time64us FixedWidthDataType
Time64ns FixedWidthDataType
Timestamp FixedWidthDataType
}{
Boolean: &BooleanType{},
Float16: &Float16Type{},
Time32s: &Time32Type{Unit: Second},
Time32ms: &Time32Type{Unit: Millisecond},
Time64us: &Time64Type{Unit: Microsecond},
Time64ns: &Time64Type{Unit: Nanosecond},
Boolean: &BooleanType{},
Float16: &Float16Type{},
Time32s: &Time32Type{Unit: Second},
Time32ms: &Time32Type{Unit: Millisecond},
Time64us: &Time64Type{Unit: Microsecond},
Time64ns: &Time64Type{Unit: Nanosecond},
Timestamp: &TimestampType{Unit: Nanosecond, TimeZone: "UTC"},
}

_ FixedWidthDataType = (*FixedSizeBinaryType)(nil)
Expand Down
11 changes: 11 additions & 0 deletions go/arrow/internal/arrdata/arrdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ func makeFixedWidthTypesRecords() []array.Record {
arrow.Field{Name: "time32s", Type: arrow.FixedWidthTypes.Time32s, Nullable: true},
arrow.Field{Name: "time64ns", Type: arrow.FixedWidthTypes.Time64ns, Nullable: true},
arrow.Field{Name: "time64us", Type: arrow.FixedWidthTypes.Time64us, Nullable: true},
arrow.Field{Name: "timestamp", Type: arrow.FixedWidthTypes.Timestamp, Nullable: true},
}, nil,
)

Expand All @@ -353,20 +354,23 @@ func makeFixedWidthTypesRecords() []array.Record {
arrayOf(mem, []time32s{-2, -1, 0, +1, +2}, mask),
arrayOf(mem, []time64ns{-2, -1, 0, +1, +2}, mask),
arrayOf(mem, []time64us{-2, -1, 0, +1, +2}, mask),
arrayOf(mem, []arrow.Timestamp{0, +1, +2, +3, +4}, mask),
},
[]array.Interface{
arrayOf(mem, float16s([]float32{+11, +12, +13, +14, +15}), mask),
arrayOf(mem, []time32ms{-12, -11, 10, +11, +12}, mask),
arrayOf(mem, []time32s{-12, -11, 10, +11, +12}, mask),
arrayOf(mem, []time64ns{-12, -11, 10, +11, +12}, mask),
arrayOf(mem, []time64us{-12, -11, 10, +11, +12}, mask),
arrayOf(mem, []arrow.Timestamp{10, +11, +12, +13, +14}, mask),
},
[]array.Interface{
arrayOf(mem, float16s([]float32{+21, +22, +23, +24, +25}), mask),
arrayOf(mem, []time32ms{-22, -21, 20, +21, +22}, mask),
arrayOf(mem, []time32s{-22, -21, 20, +21, +22}, mask),
arrayOf(mem, []time64ns{-22, -21, 20, +21, +22}, mask),
arrayOf(mem, []time64us{-22, -21, 20, +21, +22}, mask),
arrayOf(mem, []arrow.Timestamp{20, +21, +22, +23, +24}, mask),
},
}

Expand Down Expand Up @@ -534,6 +538,13 @@ func arrayOf(mem memory.Allocator, a interface{}, valids []bool) array.Interface
bldr.AppendValues(vs, valids)
return bldr.NewArray()

case []arrow.Timestamp:
bldr := array.NewTimestampBuilder(mem, arrow.FixedWidthTypes.Timestamp.(*arrow.TimestampType))
defer bldr.Release()

bldr.AppendValues(a, valids)
return bldr.NewArray()

default:
panic(fmt.Errorf("arrdata: invalid data slice type %T", a))
}
Expand Down
9 changes: 9 additions & 0 deletions go/arrow/ipc/cmd/arrow-cat/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,21 @@ record 3...
col[2] "time32s": [-2 (null) (null) 1 2]
col[3] "time64ns": [-2 (null) (null) 1 2]
col[4] "time64us": [-2 (null) (null) 1 2]
col[5] "timestamp": [0 (null) (null) 3 4]
record 2...
col[0] "float16s": [11 (null) (null) 14 15]
col[1] "time32ms": [-12 (null) (null) 11 12]
col[2] "time32s": [-12 (null) (null) 11 12]
col[3] "time64ns": [-12 (null) (null) 11 12]
col[4] "time64us": [-12 (null) (null) 11 12]
col[5] "timestamp": [10 (null) (null) 13 14]
record 3...
col[0] "float16s": [21 (null) (null) 24 25]
col[1] "time32ms": [-22 (null) (null) 21 22]
col[2] "time32s": [-22 (null) (null) 21 22]
col[3] "time64ns": [-22 (null) (null) 21 22]
col[4] "time64us": [-22 (null) (null) 21 22]
col[5] "timestamp": [20 (null) (null) 23 24]
`,
},
} {
Expand Down Expand Up @@ -386,18 +389,21 @@ record 3/3...
col[2] "time32s": [-2 (null) (null) 1 2]
col[3] "time64ns": [-2 (null) (null) 1 2]
col[4] "time64us": [-2 (null) (null) 1 2]
col[5] "timestamp": [0 (null) (null) 3 4]
record 2...
col[0] "float16s": [11 (null) (null) 14 15]
col[1] "time32ms": [-12 (null) (null) 11 12]
col[2] "time32s": [-12 (null) (null) 11 12]
col[3] "time64ns": [-12 (null) (null) 11 12]
col[4] "time64us": [-12 (null) (null) 11 12]
col[5] "timestamp": [10 (null) (null) 13 14]
record 3...
col[0] "float16s": [21 (null) (null) 24 25]
col[1] "time32ms": [-22 (null) (null) 21 22]
col[2] "time32s": [-22 (null) (null) 21 22]
col[3] "time64ns": [-22 (null) (null) 21 22]
col[4] "time64us": [-22 (null) (null) 21 22]
col[5] "timestamp": [20 (null) (null) 23 24]
`,
},
{
Expand All @@ -409,18 +415,21 @@ record 1/3...
col[2] "time32s": [-2 (null) (null) 1 2]
col[3] "time64ns": [-2 (null) (null) 1 2]
col[4] "time64us": [-2 (null) (null) 1 2]
col[5] "timestamp": [0 (null) (null) 3 4]
record 2/3...
col[0] "float16s": [11 (null) (null) 14 15]
col[1] "time32ms": [-12 (null) (null) 11 12]
col[2] "time32s": [-12 (null) (null) 11 12]
col[3] "time64ns": [-12 (null) (null) 11 12]
col[4] "time64us": [-12 (null) (null) 11 12]
col[5] "timestamp": [10 (null) (null) 13 14]
record 3/3...
col[0] "float16s": [21 (null) (null) 24 25]
col[1] "time32ms": [-22 (null) (null) 21 22]
col[2] "time32s": [-22 (null) (null) 21 22]
col[3] "time64ns": [-22 (null) (null) 21 22]
col[4] "time64us": [-22 (null) (null) 21 22]
col[5] "timestamp": [20 (null) (null) 23 24]
`,
},
} {
Expand Down
3 changes: 2 additions & 1 deletion go/arrow/ipc/cmd/arrow-ls/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,13 @@ records: 3
{
name: "fixed_width_types",
want: `schema:
fields: 5
fields: 6
- float16s: type=float16, nullable
- time32ms: type=time32[ms], nullable
- time32s: type=time32[s], nullable
- time64ns: type=time64[ns], nullable
- time64us: type=time64[us], nullable
- timestamp: type=timestamp[ns], nullable
records: 3
`,
},
Expand Down
3 changes: 2 additions & 1 deletion go/arrow/ipc/file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,8 @@ func (ctx *arrayLoaderContext) loadArray(dt arrow.DataType) array.Interface {
*arrow.Int8Type, *arrow.Int16Type, *arrow.Int32Type, *arrow.Int64Type,
*arrow.Uint8Type, *arrow.Uint16Type, *arrow.Uint32Type, *arrow.Uint64Type,
*arrow.Float16Type, *arrow.Float32Type, *arrow.Float64Type,
*arrow.Time32Type, *arrow.Time64Type:
*arrow.Time32Type, *arrow.Time64Type,
*arrow.TimestampType:
return ctx.loadPrimitive(dt)

case *arrow.BinaryType, *arrow.StringType:
Expand Down
20 changes: 9 additions & 11 deletions go/arrow/ipc/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,11 @@ func concreteTypeFromFB(typ flatbuf.Type, data flatbuffers.Table, children []arr
dt.Init(data.Bytes, data.Pos)
return timeFromFB(dt)

case flatbuf.TypeTimestamp:
var dt flatbuf.Timestamp
dt.Init(data.Bytes, data.Pos)
return timestampFromFB(dt)

default:
// FIXME(sbinet): implement all the other types.
panic(fmt.Errorf("arrow/ipc: type %v not implemented", flatbuf.EnumNamesType[typ]))
Expand Down Expand Up @@ -640,17 +645,10 @@ func timeFromFB(data flatbuf.Time) (arrow.DataType, error) {
}
}

func timeToFB(b *flatbuffers.Builder, bw int32, unit arrow.TimeUnit) flatbuffers.UOffsetT {
switch bw {
case 32, 64:
// ok.
panic(errors.Errorf("arrow/ipc: invalid Time type bit-wdith %d", bw))
}

flatbuf.TimeStart(b)
flatbuf.TimeAddBitWidth(b, bw)
flatbuf.TimeAddUnit(b, unitToFB(unit))
return flatbuf.TimeEnd(b)
func timestampFromFB(data flatbuf.Timestamp) (arrow.DataType, error) {
unit := unitFromFB(data.Unit())
tz := string(data.Timezone())
return &arrow.TimestampType{Unit: unit, TimeZone: tz}, nil
}

type customMetadataer interface {
Expand Down

0 comments on commit c273a24

Please sign in to comment.