diff --git a/go/arrow/float16/float16.go b/go/arrow/float16/float16.go index 4e03d13df0cae..e0192495eb971 100644 --- a/go/arrow/float16/float16.go +++ b/go/arrow/float16/float16.go @@ -17,6 +17,7 @@ package float16 import ( + "encoding/binary" "math" "strconv" ) @@ -29,6 +30,11 @@ type Num struct { bits uint16 } +var ( + MaxNum = Num{bits: 0b0111101111111111} + MinNum = MaxNum.Negate() +) + // New creates a new half-precision floating point value from the provided // float32 value. func New(f float32) Num { @@ -86,6 +92,11 @@ func (n Num) Div(rhs Num) Num { return New(n.Float32() / rhs.Float32()) } +// Equal returns true if the value represented by n is == other +func (n Num) Equal(other Num) bool { + return n.Float32() == other.Float32() +} + // Greater returns true if the value represented by n is > other func (n Num) Greater(other Num) bool { return n.Float32() > other.Float32() @@ -152,14 +163,39 @@ func (n Num) Abs() Num { } func (n Num) Sign() int { - f := n.Float32() - if f > 0 { - return 1 - } else if f == 0 { + if n.IsZero() { return 0 + } else if n.Signbit() { + return -1 } - return -1 + return 1 } +func (n Num) Signbit() bool { return (n.bits & 0x8000) != 0 } + +func (n Num) IsNaN() bool { return (n.bits & 0x7fff) > 0x7c00 } + +func (n Num) IsZero() bool { return (n.bits & 0x7fff) == 0 } + func (f Num) Uint16() uint16 { return f.bits } func (f Num) String() string { return strconv.FormatFloat(float64(f.Float32()), 'g', -1, 32) } + +func Inf() Num { return Num{bits: 0x7c00} } + +func NaN() Num { return Num{bits: 0x7fff} } + +func FromBits(src uint16) Num { return Num{bits: src} } + +func FromLEBytes(src []byte) Num { + return Num{bits: binary.LittleEndian.Uint16(src)} +} + +func (f Num) PutLEBytes(dst []byte) { + binary.LittleEndian.PutUint16(dst, f.bits) +} + +func (f Num) ToLEBytes() []byte { + dst := make([]byte, 2) + f.PutLEBytes(dst) + return dst +} diff --git a/go/arrow/float16/float16_test.go b/go/arrow/float16/float16_test.go index 55c3ea8b30404..cfde440c5f9e4 100644 --- a/go/arrow/float16/float16_test.go +++ b/go/arrow/float16/float16_test.go @@ -238,6 +238,7 @@ func TestSign(t *testing.T) { }{ {Num{bits: 0x4580}, 1}, // 5.5 {Num{bits: 0x0000}, 0}, // 0 + {Num{bits: 0x8000}, 0}, // -0 {Num{bits: 0xC580}, -1}, // -5.5 } { t.Run("sign", func(t *testing.T) { @@ -248,3 +249,45 @@ func TestSign(t *testing.T) { }) } } + +func TestSignbit(t *testing.T) { + for _, tc := range []struct { + n Num + want bool + }{ + {Num{bits: 0x4580}, false}, // 5.5 + {Num{bits: 0x0000}, false}, // 0 + {Num{bits: 0x8000}, true}, // -0 + {Num{bits: 0xC580}, true}, // -5.5 + } { + t.Run("signbit", func(t *testing.T) { + n := tc.n.Signbit() + if got, want := n, tc.want; got != want { + t.Fatalf("invalid value. got=%v, want=%v", got, want) + } + }) + } +} + +func TestIsNaN(t *testing.T) { + for _, tc := range []struct { + n Num + want bool + }{ + {NaN(), true}, + {NaN().Negate(), true}, + {Inf(), false}, + {Inf().Negate(), false}, + {Num{bits: 0x7c01}, true}, // nan + {Num{bits: 0xfc01}, true}, // -nan + {Num{bits: 0x7e00}, true}, // nan + {Num{bits: 0xfe00}, true}, // -nan + } { + t.Run("isnan", func(t *testing.T) { + n := tc.n.IsNaN() + if got, want := n, tc.want; got != want { + t.Fatalf("invalid value. got=%v, want=%v", got, want) + } + }) + } +} diff --git a/go/parquet/file/column_writer_types.gen.go b/go/parquet/file/column_writer_types.gen.go index 8b3be25f3ea29..b4d7954639319 100644 --- a/go/parquet/file/column_writer_types.gen.go +++ b/go/parquet/file/column_writer_types.gen.go @@ -28,6 +28,7 @@ import ( "github.com/apache/arrow/go/v15/parquet/internal/encoding" format "github.com/apache/arrow/go/v15/parquet/internal/gen-go/parquet" "github.com/apache/arrow/go/v15/parquet/metadata" + "github.com/apache/arrow/go/v15/parquet/schema" "golang.org/x/xerrors" ) @@ -1629,7 +1630,11 @@ func (w *FixedLenByteArrayColumnChunkWriter) WriteDictIndices(indices arrow.Arra func (w *FixedLenByteArrayColumnChunkWriter) writeValues(values []parquet.FixedLenByteArray, numNulls int64) { w.currentEncoder.(encoding.FixedLenByteArrayEncoder).Put(values) if w.pageStatistics != nil { - w.pageStatistics.(*metadata.FixedLenByteArrayStatistics).Update(values, numNulls) + if w.Descr().LogicalType().Equals(schema.Float16LogicalType{}) { + w.pageStatistics.(*metadata.Float16Statistics).Update(values, numNulls) + } else { + w.pageStatistics.(*metadata.FixedLenByteArrayStatistics).Update(values, numNulls) + } } } @@ -1641,7 +1646,11 @@ func (w *FixedLenByteArrayColumnChunkWriter) writeValuesSpaced(spacedValues []pa } if w.pageStatistics != nil { nulls := numValues - numRead - w.pageStatistics.(*metadata.FixedLenByteArrayStatistics).UpdateSpaced(spacedValues, validBits, validBitsOffset, nulls) + if w.Descr().LogicalType().Equals(schema.Float16LogicalType{}) { + w.pageStatistics.(*metadata.Float16Statistics).UpdateSpaced(spacedValues, validBits, validBitsOffset, nulls) + } else { + w.pageStatistics.(*metadata.FixedLenByteArrayStatistics).UpdateSpaced(spacedValues, validBits, validBitsOffset, nulls) + } } } diff --git a/go/parquet/file/column_writer_types.gen.go.tmpl b/go/parquet/file/column_writer_types.gen.go.tmpl index 7df69b4a219c6..70bcfe679eb92 100644 --- a/go/parquet/file/column_writer_types.gen.go.tmpl +++ b/go/parquet/file/column_writer_types.gen.go.tmpl @@ -18,7 +18,7 @@ package file import ( "fmt" - + "github.com/apache/arrow/go/v15/parquet" "github.com/apache/arrow/go/v15/parquet/metadata" "github.com/apache/arrow/go/v15/parquet/internal/encoding" @@ -83,7 +83,7 @@ func (w *{{.Name}}ColumnChunkWriter) WriteBatch(values []{{.name}}, defLevels, r // writes a large number of values, the DataPage size can be much above the limit. // The purpose of this chunking is to bound this. Even if a user writes large number // of values, the chunking will ensure the AddDataPage() is called at a reasonable - // pagesize limit + // pagesize limit var n int64 switch { case defLevels != nil: @@ -107,7 +107,7 @@ func (w *{{.Name}}ColumnChunkWriter) WriteBatch(values []{{.name}}, defLevels, r valueOffset += toWrite w.checkDictionarySizeLimit() }) - return + return } // WriteBatchSpaced writes a batch of repetition levels, definition levels, and values to the @@ -132,7 +132,7 @@ func (w *{{.Name}}ColumnChunkWriter) WriteBatchSpaced(values []{{.name}}, defLev length = len(values) } doBatches(int64(length), w.props.WriteBatchSize(), func(offset, batch int64) { - var vals []{{.name}} + var vals []{{.name}} info := w.maybeCalculateValidityBits(levelSliceOrNil(defLevels, offset, batch), batch) w.writeLevelsSpaced(batch, levelSliceOrNil(defLevels, offset, batch), levelSliceOrNil(repLevels, offset, batch)) @@ -165,7 +165,7 @@ func (w *{{.Name}}ColumnChunkWriter) WriteDictIndices(indices arrow.Array, defLe } } }() - + valueOffset := int64(0) length := len(defLevels) if defLevels == nil { @@ -193,14 +193,22 @@ func (w *{{.Name}}ColumnChunkWriter) WriteDictIndices(indices arrow.Array, defLe valueOffset += info.numSpaced() }) - + return } func (w *{{.Name}}ColumnChunkWriter) writeValues(values []{{.name}}, numNulls int64) { w.currentEncoder.(encoding.{{.Name}}Encoder).Put(values) if w.pageStatistics != nil { +{{- if ne .Name "FixedLenByteArray"}} w.pageStatistics.(*metadata.{{.Name}}Statistics).Update(values, numNulls) +{{- else}} + if w.Descr().LogicalType().Equals(schema.Float16LogicalType{}) { + w.pageStatistics.(*metadata.Float16Statistics).Update(values, numNulls) + } else { + w.pageStatistics.(*metadata.{{.Name}}Statistics).Update(values, numNulls) + } +{{- end}} } } @@ -212,7 +220,15 @@ func (w *{{.Name}}ColumnChunkWriter) writeValuesSpaced(spacedValues []{{.name}}, } if w.pageStatistics != nil { nulls := numValues - numRead +{{- if ne .Name "FixedLenByteArray"}} w.pageStatistics.(*metadata.{{.Name}}Statistics).UpdateSpaced(spacedValues, validBits, validBitsOffset, nulls) +{{- else}} + if w.Descr().LogicalType().Equals(schema.Float16LogicalType{}) { + w.pageStatistics.(*metadata.Float16Statistics).UpdateSpaced(spacedValues, validBits, validBitsOffset, nulls) + } else { + w.pageStatistics.(*metadata.{{.Name}}Statistics).UpdateSpaced(spacedValues, validBits, validBitsOffset, nulls) + } +{{- end}} } } diff --git a/go/parquet/internal/gen-go/parquet/GoUnusedProtection__.go b/go/parquet/internal/gen-go/parquet/GoUnusedProtection__.go index 1de0c8dee4cbd..01f1eb5aa99e3 100644 --- a/go/parquet/internal/gen-go/parquet/GoUnusedProtection__.go +++ b/go/parquet/internal/gen-go/parquet/GoUnusedProtection__.go @@ -1,4 +1,4 @@ -// Code generated by Thrift Compiler (0.16.0). DO NOT EDIT. +// Code generated by Thrift Compiler (0.18.1). DO NOT EDIT. package parquet diff --git a/go/parquet/internal/gen-go/parquet/parquet-consts.go b/go/parquet/internal/gen-go/parquet/parquet-consts.go index d4a63b22b890a..ab0a73c596e7d 100644 --- a/go/parquet/internal/gen-go/parquet/parquet-consts.go +++ b/go/parquet/internal/gen-go/parquet/parquet-consts.go @@ -1,21 +1,28 @@ -// Code generated by Thrift Compiler (0.16.0). DO NOT EDIT. +// Code generated by Thrift Compiler (0.18.1). DO NOT EDIT. package parquet import ( "bytes" "context" + "errors" "fmt" "time" thrift "github.com/apache/thrift/lib/go/thrift" + "strings" + "regexp" ) // (needed to ensure safety because of naive import list construction.) var _ = thrift.ZERO var _ = fmt.Printf +var _ = errors.New var _ = context.Background var _ = time.Now var _ = bytes.Equal +// (needed by validator.) +var _ = strings.Contains +var _ = regexp.MatchString func init() { diff --git a/go/parquet/internal/gen-go/parquet/parquet.go b/go/parquet/internal/gen-go/parquet/parquet.go index d4508f8e4529f..9dcedae8888d3 100644 --- a/go/parquet/internal/gen-go/parquet/parquet.go +++ b/go/parquet/internal/gen-go/parquet/parquet.go @@ -1,4 +1,4 @@ -// Code generated by Thrift Compiler (0.16.0). DO NOT EDIT. +// Code generated by Thrift Compiler (0.18.1). DO NOT EDIT. package parquet @@ -10,14 +10,20 @@ import ( "fmt" "time" thrift "github.com/apache/thrift/lib/go/thrift" + "strings" + "regexp" ) // (needed to ensure safety because of naive import list construction.) var _ = thrift.ZERO var _ = fmt.Printf +var _ = errors.New var _ = context.Background var _ = time.Now var _ = bytes.Equal +// (needed by validator.) +var _ = strings.Contains +var _ = regexp.MatchString //Types supported by Parquet. These types are intended to be used in combination //with the encodings to control the on disk storage format. @@ -94,9 +100,10 @@ func (p * Type) Value() (driver.Value, error) { } return int64(*p), nil } -//Common types used by frameworks(e.g. hive, pig) using parquet. This helps map -//between types in those frameworks to the base types in parquet. This is only -//metadata and not needed to read or write the data. +//DEPRECATED: Common types used by frameworks(e.g. hive, pig) using parquet. +//ConvertedType is superseded by LogicalType. This enum should not be extended. +// +//See LogicalTypes.md for conversion between ConvertedType and LogicalType. type ConvertedType int64 const ( ConvertedType_UTF8 ConvertedType = 0 @@ -897,6 +904,9 @@ func (p *Statistics) String() string { return fmt.Sprintf("Statistics(%+v)", *p) } +func (p *Statistics) Validate() error { + return nil +} // Empty structs to use as logical type annotations type StringType struct { } @@ -958,6 +968,9 @@ func (p *StringType) String() string { return fmt.Sprintf("StringType(%+v)", *p) } +func (p *StringType) Validate() error { + return nil +} type UUIDType struct { } @@ -1018,6 +1031,9 @@ func (p *UUIDType) String() string { return fmt.Sprintf("UUIDType(%+v)", *p) } +func (p *UUIDType) Validate() error { + return nil +} type MapType struct { } @@ -1078,6 +1094,9 @@ func (p *MapType) String() string { return fmt.Sprintf("MapType(%+v)", *p) } +func (p *MapType) Validate() error { + return nil +} type ListType struct { } @@ -1138,6 +1157,9 @@ func (p *ListType) String() string { return fmt.Sprintf("ListType(%+v)", *p) } +func (p *ListType) Validate() error { + return nil +} type EnumType struct { } @@ -1198,6 +1220,9 @@ func (p *EnumType) String() string { return fmt.Sprintf("EnumType(%+v)", *p) } +func (p *EnumType) Validate() error { + return nil +} type DateType struct { } @@ -1258,6 +1283,72 @@ func (p *DateType) String() string { return fmt.Sprintf("DateType(%+v)", *p) } +func (p *DateType) Validate() error { + return nil +} +type Float16Type struct { +} + +func NewFloat16Type() *Float16Type { + return &Float16Type{} +} + +func (p *Float16Type) Read(ctx context.Context, iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(ctx); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) + } + + + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin(ctx) + if err != nil { + return thrift.PrependError(fmt.Sprintf("%T field %d read error: ", p, fieldId), err) + } + if fieldTypeId == thrift.STOP { break; } + if err := iprot.Skip(ctx, fieldTypeId); err != nil { + return err + } + if err := iprot.ReadFieldEnd(ctx); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(ctx); err != nil { + return thrift.PrependError(fmt.Sprintf("%T read struct end error: ", p), err) + } + return nil +} + +func (p *Float16Type) Write(ctx context.Context, oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin(ctx, "Float16Type"); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) } + if p != nil { + } + if err := oprot.WriteFieldStop(ctx); err != nil { + return thrift.PrependError("write field stop error: ", err) } + if err := oprot.WriteStructEnd(ctx); err != nil { + return thrift.PrependError("write struct stop error: ", err) } + return nil +} + +func (p *Float16Type) Equals(other *Float16Type) bool { + if p == other { + return true + } else if p == nil || other == nil { + return false + } + return true +} + +func (p *Float16Type) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("Float16Type(%+v)", *p) +} + +func (p *Float16Type) Validate() error { + return nil +} // Logical type to annotate a column that is always null. // // Sometimes when discovering the schema of existing data, values are always @@ -1323,6 +1414,9 @@ func (p *NullType) String() string { return fmt.Sprintf("NullType(%+v)", *p) } +func (p *NullType) Validate() error { + return nil +} // Decimal logical type annotation // // To maintain forward-compatibility in v1, implementations using this logical @@ -1478,6 +1572,9 @@ func (p *DecimalType) String() string { return fmt.Sprintf("DecimalType(%+v)", *p) } +func (p *DecimalType) Validate() error { + return nil +} // Time units for logical types type MilliSeconds struct { } @@ -1539,6 +1636,9 @@ func (p *MilliSeconds) String() string { return fmt.Sprintf("MilliSeconds(%+v)", *p) } +func (p *MilliSeconds) Validate() error { + return nil +} type MicroSeconds struct { } @@ -1599,6 +1699,9 @@ func (p *MicroSeconds) String() string { return fmt.Sprintf("MicroSeconds(%+v)", *p) } +func (p *MicroSeconds) Validate() error { + return nil +} type NanoSeconds struct { } @@ -1659,6 +1762,9 @@ func (p *NanoSeconds) String() string { return fmt.Sprintf("NanoSeconds(%+v)", *p) } +func (p *NanoSeconds) Validate() error { + return nil +} // Attributes: // - MILLIS // - MICROS @@ -1879,6 +1985,9 @@ func (p *TimeUnit) String() string { return fmt.Sprintf("TimeUnit(%+v)", *p) } +func (p *TimeUnit) Validate() error { + return nil +} // Timestamp logical type annotation // // Allowed for physical types: INT64 @@ -2038,6 +2147,9 @@ func (p *TimestampType) String() string { return fmt.Sprintf("TimestampType(%+v)", *p) } +func (p *TimestampType) Validate() error { + return nil +} // Time logical type annotation // // Allowed for physical types: INT32 (millis), INT64 (micros, nanos) @@ -2197,6 +2309,9 @@ func (p *TimeType) String() string { return fmt.Sprintf("TimeType(%+v)", *p) } +func (p *TimeType) Validate() error { + return nil +} // Integer logical type annotation // // bitWidth must be 8, 16, 32, or 64. @@ -2352,6 +2467,9 @@ func (p *IntType) String() string { return fmt.Sprintf("IntType(%+v)", *p) } +func (p *IntType) Validate() error { + return nil +} // Embedded JSON logical type annotation // // Allowed for physical types: BINARY @@ -2415,6 +2533,9 @@ func (p *JsonType) String() string { return fmt.Sprintf("JsonType(%+v)", *p) } +func (p *JsonType) Validate() error { + return nil +} // Embedded BSON logical type annotation // // Allowed for physical types: BINARY @@ -2478,11 +2599,14 @@ func (p *BsonType) String() string { return fmt.Sprintf("BsonType(%+v)", *p) } +func (p *BsonType) Validate() error { + return nil +} // LogicalType annotations to replace ConvertedType. // // To maintain compatibility, implementations using LogicalType for a -// SchemaElement must also set the corresponding ConvertedType from the -// following table. +// SchemaElement must also set the corresponding ConvertedType (if any) +// from the following table. // // Attributes: // - STRING @@ -2498,6 +2622,7 @@ func (p *BsonType) String() string { // - JSON // - BSON // - UUID +// - FLOAT16 type LogicalType struct { STRING *StringType `thrift:"STRING,1" db:"STRING" json:"STRING,omitempty"` MAP *MapType `thrift:"MAP,2" db:"MAP" json:"MAP,omitempty"` @@ -2513,6 +2638,7 @@ type LogicalType struct { JSON *JsonType `thrift:"JSON,12" db:"JSON" json:"JSON,omitempty"` BSON *BsonType `thrift:"BSON,13" db:"BSON" json:"BSON,omitempty"` UUID *UUIDType `thrift:"UUID,14" db:"UUID" json:"UUID,omitempty"` + FLOAT16 *Float16Type `thrift:"FLOAT16,15" db:"FLOAT16" json:"FLOAT16,omitempty"` } func NewLogicalType() *LogicalType { @@ -2610,6 +2736,13 @@ func (p *LogicalType) GetUUID() *UUIDType { } return p.UUID } +var LogicalType_FLOAT16_DEFAULT *Float16Type +func (p *LogicalType) GetFLOAT16() *Float16Type { + if !p.IsSetFLOAT16() { + return LogicalType_FLOAT16_DEFAULT + } +return p.FLOAT16 +} func (p *LogicalType) CountSetFieldsLogicalType() int { count := 0 if (p.IsSetSTRING()) { @@ -2651,6 +2784,9 @@ func (p *LogicalType) CountSetFieldsLogicalType() int { if (p.IsSetUUID()) { count++ } + if (p.IsSetFLOAT16()) { + count++ + } return count } @@ -2707,6 +2843,10 @@ func (p *LogicalType) IsSetUUID() bool { return p.UUID != nil } +func (p *LogicalType) IsSetFLOAT16() bool { + return p.FLOAT16 != nil +} + func (p *LogicalType) Read(ctx context.Context, iprot thrift.TProtocol) error { if _, err := iprot.ReadStructBegin(ctx); err != nil { return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err) @@ -2850,6 +2990,16 @@ func (p *LogicalType) Read(ctx context.Context, iprot thrift.TProtocol) error { return err } } + case 15: + if fieldTypeId == thrift.STRUCT { + if err := p.ReadField15(ctx, iprot); err != nil { + return err + } + } else { + if err := iprot.Skip(ctx, fieldTypeId); err != nil { + return err + } + } default: if err := iprot.Skip(ctx, fieldTypeId); err != nil { return err @@ -2969,6 +3119,14 @@ func (p *LogicalType) ReadField14(ctx context.Context, iprot thrift.TProtocol) return nil } +func (p *LogicalType) ReadField15(ctx context.Context, iprot thrift.TProtocol) error { + p.FLOAT16 = &Float16Type{} + if err := p.FLOAT16.Read(ctx, iprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error reading struct: ", p.FLOAT16), err) + } + return nil +} + func (p *LogicalType) Write(ctx context.Context, oprot thrift.TProtocol) error { if c := p.CountSetFieldsLogicalType(); c != 1 { return fmt.Errorf("%T write union: exactly one field must be set (%d set)", p, c) @@ -2989,6 +3147,7 @@ func (p *LogicalType) Write(ctx context.Context, oprot thrift.TProtocol) error { if err := p.writeField12(ctx, oprot); err != nil { return err } if err := p.writeField13(ctx, oprot); err != nil { return err } if err := p.writeField14(ctx, oprot); err != nil { return err } + if err := p.writeField15(ctx, oprot); err != nil { return err } } if err := oprot.WriteFieldStop(ctx); err != nil { return thrift.PrependError("write field stop error: ", err) } @@ -3166,6 +3325,19 @@ func (p *LogicalType) writeField14(ctx context.Context, oprot thrift.TProtocol) return err } +func (p *LogicalType) writeField15(ctx context.Context, oprot thrift.TProtocol) (err error) { + if p.IsSetFLOAT16() { + if err := oprot.WriteFieldBegin(ctx, "FLOAT16", thrift.STRUCT, 15); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field begin error 15:FLOAT16: ", p), err) } + if err := p.FLOAT16.Write(ctx, oprot); err != nil { + return thrift.PrependError(fmt.Sprintf("%T error writing struct: ", p.FLOAT16), err) + } + if err := oprot.WriteFieldEnd(ctx); err != nil { + return thrift.PrependError(fmt.Sprintf("%T write field end error 15:FLOAT16: ", p), err) } + } + return err +} + func (p *LogicalType) Equals(other *LogicalType) bool { if p == other { return true @@ -3185,6 +3357,7 @@ func (p *LogicalType) Equals(other *LogicalType) bool { if !p.JSON.Equals(other.JSON) { return false } if !p.BSON.Equals(other.BSON) { return false } if !p.UUID.Equals(other.UUID) { return false } + if !p.FLOAT16.Equals(other.FLOAT16) { return false } return true } @@ -3195,6 +3368,9 @@ func (p *LogicalType) String() string { return fmt.Sprintf("LogicalType(%+v)", *p) } +func (p *LogicalType) Validate() error { + return nil +} // Represents a element inside a schema definition. // - if it is a group (inner node) then type is undefined and num_children is defined // - if it is a primitive type (leaf) then type is defined and num_children is undefined @@ -3202,7 +3378,7 @@ func (p *LogicalType) String() string { // // Attributes: // - Type: Data type for this field. Not set if the current element is a non-leaf node -// - TypeLength: If type is FIXED_LEN_BYTE_ARRAY, this is the byte length of the vales. +// - TypeLength: If type is FIXED_LEN_BYTE_ARRAY, this is the byte length of the values. // Otherwise, if specified, this is the maximum bit length to store any of the values. // (e.g. a low cardinality INT col could have this set to 3). Note that this is // in the schema, and therefore fixed for the entire file. @@ -3213,10 +3389,14 @@ func (p *LogicalType) String() string { // the nesting is flattened to a single list by a depth-first traversal. // The children count is used to construct the nested relationship. // This field is not set when the element is a primitive type -// - ConvertedType: When the schema is the result of a conversion from another model +// - ConvertedType: DEPRECATED: When the schema is the result of a conversion from another model. // Used to record the original type to help with cross conversion. -// - Scale: Used when this column contains decimal data. +// +// This is superseded by logicalType. +// - Scale: DEPRECATED: Used when this column contains decimal data. // See the DECIMAL converted type for more details. +// +// This is superseded by using the DecimalType annotation in logicalType. // - Precision // - FieldID: When the original schema supports field ids, this will save the // original field id in the parquet schema @@ -3776,6 +3956,9 @@ func (p *SchemaElement) String() string { return fmt.Sprintf("SchemaElement(%+v)", *p) } +func (p *SchemaElement) Validate() error { + return nil +} // Data page header // // Attributes: @@ -4059,6 +4242,9 @@ func (p *DataPageHeader) String() string { return fmt.Sprintf("DataPageHeader(%+v)", *p) } +func (p *DataPageHeader) Validate() error { + return nil +} type IndexPageHeader struct { } @@ -4119,6 +4305,14 @@ func (p *IndexPageHeader) String() string { return fmt.Sprintf("IndexPageHeader(%+v)", *p) } +func (p *IndexPageHeader) Validate() error { + return nil +} +// The dictionary page must be placed at the first position of the column chunk +// if it is partly or completely dictionary encoded. At most one dictionary page +// can be placed in a column chunk. +// +// // Attributes: // - NumValues: Number of values in the dictionary * // - Encoding: Encoding using this dictionary page * @@ -4319,6 +4513,9 @@ func (p *DictionaryPageHeader) String() string { return fmt.Sprintf("DictionaryPageHeader(%+v)", *p) } +func (p *DictionaryPageHeader) Validate() error { + return nil +} // New page format allowing reading levels without decompressing the data // Repetition and definition levels are uncompressed // The remaining section containing the data is compressed if is_compressed is true @@ -4738,6 +4935,9 @@ func (p *DataPageHeaderV2) String() string { return fmt.Sprintf("DataPageHeaderV2(%+v)", *p) } +func (p *DataPageHeaderV2) Validate() error { + return nil +} // Block-based algorithm type annotation. * type SplitBlockAlgorithm struct { } @@ -4799,6 +4999,9 @@ func (p *SplitBlockAlgorithm) String() string { return fmt.Sprintf("SplitBlockAlgorithm(%+v)", *p) } +func (p *SplitBlockAlgorithm) Validate() error { + return nil +} // The algorithm used in Bloom filter. * // // Attributes: @@ -4923,6 +5126,9 @@ func (p *BloomFilterAlgorithm) String() string { return fmt.Sprintf("BloomFilterAlgorithm(%+v)", *p) } +func (p *BloomFilterAlgorithm) Validate() error { + return nil +} // Hash strategy type annotation. xxHash is an extremely fast non-cryptographic hash // algorithm. It uses 64 bits version of xxHash. // @@ -4986,6 +5192,9 @@ func (p *XxHash) String() string { return fmt.Sprintf("XxHash(%+v)", *p) } +func (p *XxHash) Validate() error { + return nil +} // The hash function used in Bloom filter. This function takes the hash of a column value // using plain encoding. // @@ -5112,6 +5321,9 @@ func (p *BloomFilterHash) String() string { return fmt.Sprintf("BloomFilterHash(%+v)", *p) } +func (p *BloomFilterHash) Validate() error { + return nil +} // The compression used in the Bloom filter. // type Uncompressed struct { @@ -5174,6 +5386,9 @@ func (p *Uncompressed) String() string { return fmt.Sprintf("Uncompressed(%+v)", *p) } +func (p *Uncompressed) Validate() error { + return nil +} // Attributes: // - UNCOMPRESSED type BloomFilterCompression struct { @@ -5296,6 +5511,9 @@ func (p *BloomFilterCompression) String() string { return fmt.Sprintf("BloomFilterCompression(%+v)", *p) } +func (p *BloomFilterCompression) Validate() error { + return nil +} // Bloom filter header is stored at beginning of Bloom filter data of each column // and followed by its bitset. // @@ -5553,36 +5771,29 @@ func (p *BloomFilterHeader) String() string { return fmt.Sprintf("BloomFilterHeader(%+v)", *p) } +func (p *BloomFilterHeader) Validate() error { + return nil +} // Attributes: // - Type: the type of the page: indicates which of the *_header fields is set * // - UncompressedPageSize: Uncompressed page size in bytes (not including this header) * // - CompressedPageSize: Compressed (and potentially encrypted) page size in bytes, not including this header * -// - Crc: The 32bit CRC for the page, to be be calculated as follows: -// - Using the standard CRC32 algorithm -// - On the data only, i.e. this header should not be included. 'Data' -// hereby refers to the concatenation of the repetition levels, the -// definition levels and the column value, in this exact order. -// - On the encoded versions of the repetition levels, definition levels and -// column values -// - On the compressed versions of the repetition levels, definition levels -// and column values where possible; -// - For v1 data pages, the repetition levels, definition levels and column -// values are always compressed together. If a compression scheme is -// specified, the CRC shall be calculated on the compressed version of -// this concatenation. If no compression scheme is specified, the CRC -// shall be calculated on the uncompressed version of this concatenation. -// - For v2 data pages, the repetition levels and definition levels are -// handled separately from the data and are never compressed (only -// encoded). If a compression scheme is specified, the CRC shall be -// calculated on the concatenation of the uncompressed repetition levels, -// uncompressed definition levels and the compressed column values. -// If no compression scheme is specified, the CRC shall be calculated on -// the uncompressed concatenation. -// - In encrypted columns, CRC is calculated after page encryption; the -// encryption itself is performed after page compression (if compressed) +// - Crc: The 32-bit CRC checksum for the page, to be be calculated as follows: +// +// - The standard CRC32 algorithm is used (with polynomial 0x04C11DB7, +// the same as in e.g. GZip). +// - All page types can have a CRC (v1 and v2 data pages, dictionary pages, +// etc.). +// - The CRC is computed on the serialization binary representation of the page +// (as written to disk), excluding the page header. For example, for v1 +// data pages, the CRC is computed on the concatenation of repetition levels, +// definition levels and column values (optionally compressed, optionally +// encrypted). +// - The CRC computation therefore takes place after any compression +// and encryption steps, if any. +// // If enabled, this allows for disabling checksumming in HDFS if only a few // pages need to be read. -// // - DataPageHeader // - IndexPageHeader // - DictionaryPageHeader @@ -6006,6 +6217,9 @@ func (p *PageHeader) String() string { return fmt.Sprintf("PageHeader(%+v)", *p) } +func (p *PageHeader) Validate() error { + return nil +} // Wrapper struct to store key values // // Attributes: @@ -6165,6 +6379,9 @@ func (p *KeyValue) String() string { return fmt.Sprintf("KeyValue(%+v)", *p) } +func (p *KeyValue) Validate() error { + return nil +} // Wrapper struct to specify sort order // // Attributes: @@ -6358,6 +6575,9 @@ func (p *SortingColumn) String() string { return fmt.Sprintf("SortingColumn(%+v)", *p) } +func (p *SortingColumn) Validate() error { + return nil +} // statistics of a given page type and encoding // // Attributes: @@ -6552,6 +6772,9 @@ func (p *PageEncodingStats) String() string { return fmt.Sprintf("PageEncodingStats(%+v)", *p) } +func (p *PageEncodingStats) Validate() error { + return nil +} // Description for column metadata // // Attributes: @@ -7346,6 +7569,9 @@ func (p *ColumnMetaData) String() string { return fmt.Sprintf("ColumnMetaData(%+v)", *p) } +func (p *ColumnMetaData) Validate() error { + return nil +} type EncryptionWithFooterKey struct { } @@ -7406,6 +7632,9 @@ func (p *EncryptionWithFooterKey) String() string { return fmt.Sprintf("EncryptionWithFooterKey(%+v)", *p) } +func (p *EncryptionWithFooterKey) Validate() error { + return nil +} // Attributes: // - PathInSchema: Column path in schema * // - KeyMetadata: Retrieval metadata of column encryption key * @@ -7581,6 +7810,9 @@ func (p *EncryptionWithColumnKey) String() string { return fmt.Sprintf("EncryptionWithColumnKey(%+v)", *p) } +func (p *EncryptionWithColumnKey) Validate() error { + return nil +} // Attributes: // - ENCRYPTION_WITH_FOOTER_KEY // - ENCRYPTION_WITH_COLUMN_KEY @@ -7752,6 +7984,9 @@ func (p *ColumnCryptoMetaData) String() string { return fmt.Sprintf("ColumnCryptoMetaData(%+v)", *p) } +func (p *ColumnCryptoMetaData) Validate() error { + return nil +} // Attributes: // - FilePath: File where column data is stored. If not set, assumed to be same file as // metadata. This path is relative to the current file. @@ -8254,6 +8489,9 @@ func (p *ColumnChunk) String() string { return fmt.Sprintf("ColumnChunk(%+v)", *p) } +func (p *ColumnChunk) Validate() error { + return nil +} // Attributes: // - Columns: Metadata for each column chunk in this row group. // This list must have the same order as the SchemaElement list in FileMetaData. @@ -8694,6 +8932,9 @@ func (p *RowGroup) String() string { return fmt.Sprintf("RowGroup(%+v)", *p) } +func (p *RowGroup) Validate() error { + return nil +} // Empty struct to signal the order defined by the physical or logical type type TypeDefinedOrder struct { } @@ -8755,6 +8996,9 @@ func (p *TypeDefinedOrder) String() string { return fmt.Sprintf("TypeDefinedOrder(%+v)", *p) } +func (p *TypeDefinedOrder) Validate() error { + return nil +} // Union to specify the order used for the min_value and max_value fields for a // column. This union takes the role of an enhanced enum that allows rich // elements (which will be needed for a collation-based ordering in the future). @@ -8808,6 +9052,13 @@ func (p *TypeDefinedOrder) String() string { // - If the min is +0, the row group may contain -0 values as well. // - If the max is -0, the row group may contain +0 values as well. // - When looking for NaN values, min and max should be ignored. +// +// When writing statistics the following rules should be followed: +// - NaNs should not be written to min or max statistics fields. +// - If the computed max value is zero (whether negative or positive), +// `+0.0` should be written into the max statistics field. +// - If the computed min value is zero (whether negative or positive), +// `-0.0` should be written into the min statistics field. type ColumnOrder struct { TYPE_ORDER *TypeDefinedOrder `thrift:"TYPE_ORDER,1" db:"TYPE_ORDER" json:"TYPE_ORDER,omitempty"` } @@ -8928,6 +9179,9 @@ func (p *ColumnOrder) String() string { return fmt.Sprintf("ColumnOrder(%+v)", *p) } +func (p *ColumnOrder) Validate() error { + return nil +} // Attributes: // - Offset: Offset of the page in the file * // - CompressedPageSize: Size of the page, including header. Sum of compressed_page_size and header @@ -9120,6 +9374,9 @@ func (p *PageLocation) String() string { return fmt.Sprintf("PageLocation(%+v)", *p) } +func (p *PageLocation) Validate() error { + return nil +} // Attributes: // - PageLocations: PageLocations, ordered by increasing PageLocation.offset. It is required // that page_locations[i].first_row_index < page_locations[i+1].first_row_index. @@ -9251,6 +9508,9 @@ func (p *OffsetIndex) String() string { return fmt.Sprintf("OffsetIndex(%+v)", *p) } +func (p *OffsetIndex) Validate() error { + return nil +} // Description for ColumnIndex. // Each [i] refers to the page at OffsetIndex.page_locations[i] // @@ -9260,15 +9520,16 @@ func (p *OffsetIndex) String() string { // have to set the corresponding entries in min_values and max_values to // byte[0], so that all lists have the same length. If false, the // corresponding entries in min_values and max_values must be valid. -// - MinValues: Two lists containing lower and upper bounds for the values of each page. -// These may be the actual minimum and maximum values found on a page, but -// can also be (more compact) values that do not exist on a page. For -// example, instead of storing ""Blart Versenwald III", a writer may set -// min_values[i]="B", max_values[i]="C". Such more compact values must still -// be valid values within the column's logical type. Readers must make sure -// that list entries are populated before using them by inspecting null_pages. +// - MinValues: Two lists containing lower and upper bounds for the values of each page +// determined by the ColumnOrder of the column. These may be the actual +// minimum and maximum values found on a page, but can also be (more compact) +// values that do not exist on a page. For example, instead of storing ""Blart +// Versenwald III", a writer may set min_values[i]="B", max_values[i]="C". +// Such more compact values must still be valid values within the column's +// logical type. Readers must make sure that list entries are populated before +// using them by inspecting null_pages. // - MaxValues -// - BoundaryOrder: Stores whether both min_values and max_values are orderd and if so, in +// - BoundaryOrder: Stores whether both min_values and max_values are ordered and if so, in // which direction. This allows readers to perform binary searches in both // lists. Readers cannot assume that max_values[i] <= min_values[i+1], even // if the lists are ordered. @@ -9644,6 +9905,9 @@ func (p *ColumnIndex) String() string { return fmt.Sprintf("ColumnIndex(%+v)", *p) } +func (p *ColumnIndex) Validate() error { + return nil +} // Attributes: // - AadPrefix: AAD prefix * // - AadFileUnique: Unique file identifier part of AAD suffix * @@ -9848,6 +10112,9 @@ func (p *AesGcmV1) String() string { return fmt.Sprintf("AesGcmV1(%+v)", *p) } +func (p *AesGcmV1) Validate() error { + return nil +} // Attributes: // - AadPrefix: AAD prefix * // - AadFileUnique: Unique file identifier part of AAD suffix * @@ -10052,6 +10319,9 @@ func (p *AesGcmCtrV1) String() string { return fmt.Sprintf("AesGcmCtrV1(%+v)", *p) } +func (p *AesGcmCtrV1) Validate() error { + return nil +} // Attributes: // - AES_GCM_V1 // - AES_GCM_CTR_V1 @@ -10223,6 +10493,9 @@ func (p *EncryptionAlgorithm) String() string { return fmt.Sprintf("EncryptionAlgorithm(%+v)", *p) } +func (p *EncryptionAlgorithm) Validate() error { + return nil +} // Description for file metadata // // Attributes: @@ -10240,17 +10513,20 @@ func (p *EncryptionAlgorithm) String() string { // version (build ). // e.g. impala version 1.0 (build 6cf94d29b2b7115df4de2c06e2ab4326d721eb55) // -// - ColumnOrders: Sort order used for the min_value and max_value fields of each column in -// this file. Sort orders are listed in the order matching the columns in the -// schema. The indexes are not necessary the same though, because only leaf -// nodes of the schema are represented in the list of sort orders. +// - ColumnOrders: Sort order used for the min_value and max_value fields in the Statistics +// objects and the min_values and max_values fields in the ColumnIndex +// objects of each column in this file. Sort orders are listed in the order +// matching the columns in the schema. The indexes are not necessary the same +// though, because only leaf nodes of the schema are represented in the list +// of sort orders. // -// Without column_orders, the meaning of the min_value and max_value fields is -// undefined. To ensure well-defined behaviour, if min_value and max_value are -// written to a Parquet file, column_orders must be written as well. +// Without column_orders, the meaning of the min_value and max_value fields +// in the Statistics object and the ColumnIndex object is undefined. To ensure +// well-defined behaviour, if these fields are written to a Parquet file, +// column_orders must be written as well. // -// The obsolete min and max fields are always sorted by signed comparison -// regardless of column_orders. +// The obsolete min and max fields in the Statistics object are always sorted +// by signed comparison regardless of column_orders. // - EncryptionAlgorithm: Encryption algorithm. This field is set only in encrypted files // with plaintext footer. Files with encrypted footer store algorithm id // in FileCryptoMetaData structure. @@ -10803,6 +11079,9 @@ func (p *FileMetaData) String() string { return fmt.Sprintf("FileMetaData(%+v)", *p) } +func (p *FileMetaData) Validate() error { + return nil +} // Crypto metadata for files with encrypted footer * // // Attributes: @@ -10965,3 +11244,6 @@ func (p *FileCryptoMetaData) String() string { return fmt.Sprintf("FileCryptoMetaData(%+v)", *p) } +func (p *FileCryptoMetaData) Validate() error { + return nil +} diff --git a/go/parquet/internal/testutils/random.go b/go/parquet/internal/testutils/random.go index bb101ebf9a92d..bb9ee0cdf2bba 100644 --- a/go/parquet/internal/testutils/random.go +++ b/go/parquet/internal/testutils/random.go @@ -28,6 +28,7 @@ import ( "github.com/apache/arrow/go/v15/arrow/array" "github.com/apache/arrow/go/v15/arrow/bitutil" "github.com/apache/arrow/go/v15/arrow/endian" + "github.com/apache/arrow/go/v15/arrow/float16" "github.com/apache/arrow/go/v15/arrow/memory" "github.com/apache/arrow/go/v15/parquet" "github.com/apache/arrow/go/v15/parquet/pqarrow" @@ -369,6 +370,17 @@ func randFloat64(r *rand.Rand) float64 { } } +// randFloat16 creates a random float value with a normal distribution +// to better spread the values out and ensure we do not return any NaN or Inf values. +func randFloat16(r *rand.Rand) float16.Num { + for { + f := float16.FromBits(uint16(r.Uint64n(math.MaxUint16 + 1))) + if !f.IsNaN() { + return f + } + } +} + // FillRandomFloat32 populates out with random float32 values using seed as the random // seed for the generator to allow consistency for testing. func FillRandomFloat32(seed uint64, out []float32) { @@ -387,6 +399,15 @@ func FillRandomFloat64(seed uint64, out []float64) { } } +// FillRandomFloat16 populates out with random float64 values using seed as the random +// seed for the generator to allow consistency for testing. +func FillRandomFloat16(seed uint64, out []float16.Num) { + r := rand.New(rand.NewSource(seed)) + for idx := range out { + out[idx] = randFloat16(r) + } +} + // FillRandomByteArray populates out with random ByteArray values with lengths between 2 and 12 // using heap as the actual memory storage used for the bytes generated. Each element of // out will be some slice of the bytes in heap, and as such heap must outlive the byte array slices. @@ -456,6 +477,8 @@ func InitValues(values interface{}, heap *memory.Buffer) { FillRandomFloat32(0, arr) case []float64: FillRandomFloat64(0, arr) + case []float16.Num: + FillRandomFloat16(0, arr) case []parquet.Int96: FillRandomInt96(0, arr) case []parquet.ByteArray: diff --git a/go/parquet/internal/testutils/random_arrow.go b/go/parquet/internal/testutils/random_arrow.go index d886db0360b84..7dd2a3e8b77e3 100644 --- a/go/parquet/internal/testutils/random_arrow.go +++ b/go/parquet/internal/testutils/random_arrow.go @@ -19,6 +19,7 @@ package testutils import ( "github.com/apache/arrow/go/v15/arrow" "github.com/apache/arrow/go/v15/arrow/array" + "github.com/apache/arrow/go/v15/arrow/float16" "github.com/apache/arrow/go/v15/arrow/memory" "golang.org/x/exp/rand" ) @@ -49,6 +50,13 @@ func RandomNonNull(mem memory.Allocator, dt arrow.DataType, size int) arrow.Arra FillRandomFloat64(0, values) bldr.AppendValues(values, nil) return bldr.NewArray() + case arrow.FLOAT16: + bldr := array.NewFloat16Builder(mem) + defer bldr.Release() + values := make([]float16.Num, size) + FillRandomFloat16(0, values) + bldr.AppendValues(values, nil) + return bldr.NewArray() case arrow.INT64: bldr := array.NewInt64Builder(mem) defer bldr.Release() @@ -212,6 +220,21 @@ func RandomNullable(dt arrow.DataType, size int, numNulls int) arrow.Array { values := make([]float64, size) FillRandomFloat64(0, values) + valid := make([]bool, size) + for idx := range valid { + valid[idx] = true + } + for i := 0; i < numNulls; i++ { + valid[i*2] = false + } + bldr.AppendValues(values, valid) + return bldr.NewArray() + case arrow.FLOAT16: + bldr := array.NewFloat16Builder(memory.DefaultAllocator) + defer bldr.Release() + values := make([]float16.Num, size) + FillRandomFloat16(0, values) + valid := make([]bool, size) for idx := range valid { valid[idx] = true diff --git a/go/parquet/metadata/statistics.go b/go/parquet/metadata/statistics.go index 606e76ffd0454..43294272dec35 100644 --- a/go/parquet/metadata/statistics.go +++ b/go/parquet/metadata/statistics.go @@ -23,6 +23,7 @@ import ( "unsafe" "github.com/apache/arrow/go/v15/arrow" + "github.com/apache/arrow/go/v15/arrow/float16" "github.com/apache/arrow/go/v15/arrow/memory" "github.com/apache/arrow/go/v15/internal/utils" "github.com/apache/arrow/go/v15/parquet" @@ -32,7 +33,7 @@ import ( "github.com/apache/arrow/go/v15/parquet/schema" ) -//go:generate go run ../../arrow/_tools/tmpl/main.go -i -data=../internal/encoding/physical_types.tmpldata statistics_types.gen.go.tmpl +//go:generate go run ../../arrow/_tools/tmpl/main.go -i -data=statistics_types.tmpldata statistics_types.gen.go.tmpl type StatProvider interface { GetMin() []byte @@ -373,6 +374,9 @@ var ( defaultMinUInt96 parquet.Int96 defaultMaxInt96 parquet.Int96 defaultMaxUInt96 parquet.Int96 + + defaultMinFloat16 parquet.FixedLenByteArray = float16.MaxNum.ToLEBytes() + defaultMaxFloat16 parquet.FixedLenByteArray = float16.MinNum.ToLEBytes() ) func init() { @@ -407,6 +411,14 @@ func (s *Int96Statistics) defaultMax() parquet.Int96 { return defaultMaxInt96 } +func (Float16Statistics) defaultMin() parquet.FixedLenByteArray { + return defaultMinFloat16 +} + +func (Float16Statistics) defaultMax() parquet.FixedLenByteArray { + return defaultMaxFloat16 +} + func (Float32Statistics) defaultMin() float32 { return math.MaxFloat32 } func (Float32Statistics) defaultMax() float32 { return -math.MaxFloat32 } func (Float64Statistics) defaultMin() float64 { return math.MaxFloat64 } @@ -427,6 +439,10 @@ func (FixedLenByteArrayStatistics) equal(a, b parquet.FixedLenByteArray) bool { return bytes.Equal(a, b) } +func (Float16Statistics) equal(a, b parquet.FixedLenByteArray) bool { + return float16.FromLEBytes(a).Equal(float16.FromLEBytes(b)) +} + func (BooleanStatistics) less(a, b bool) bool { return !a && b } @@ -481,6 +497,10 @@ func (s *FixedLenByteArrayStatistics) less(a, b parquet.FixedLenByteArray) bool return signedByteLess([]byte(a), []byte(b)) } +func (Float16Statistics) less(a, b parquet.FixedLenByteArray) bool { + return float16.FromLEBytes(a).Less(float16.FromLEBytes(b)) +} + func (BooleanStatistics) cleanStat(minMax minmaxPairBoolean) *minmaxPairBoolean { return &minMax } func (Int32Statistics) cleanStat(minMax minmaxPairInt32) *minmaxPairInt32 { return &minMax } func (Int64Statistics) cleanStat(minMax minmaxPairInt64) *minmaxPairInt64 { return &minMax } @@ -535,6 +555,29 @@ func (Float64Statistics) cleanStat(minMax minmaxPairFloat64) *minmaxPairFloat64 return &minMax } +func (Float16Statistics) cleanStat(minMax minmaxPairFloat16) *minmaxPairFloat16 { + min := float16.FromLEBytes(minMax[0][:]) + max := float16.FromLEBytes(minMax[1][:]) + + if min.IsNaN() || max.IsNaN() { + return nil + } + + if min.Equal(float16.MaxNum) && max.Equal(float16.MinNum) { + return nil + } + + zero := float16.New(0) + if min.Equal(zero) && !min.Signbit() { + minMax[0] = min.Negate().ToLEBytes() + } + if max.Equal(zero) && max.Signbit() { + minMax[1] = max.Negate().ToLEBytes() + } + + return &minMax +} + func (ByteArrayStatistics) cleanStat(minMax minmaxPairByteArray) *minmaxPairByteArray { if minMax[0] == nil || minMax[1] == nil { return nil diff --git a/go/parquet/metadata/statistics_test.go b/go/parquet/metadata/statistics_test.go index 47798d3c4dd3c..19311dc8955d3 100644 --- a/go/parquet/metadata/statistics_test.go +++ b/go/parquet/metadata/statistics_test.go @@ -22,6 +22,7 @@ import ( "testing" "github.com/apache/arrow/go/v15/arrow/bitutil" + "github.com/apache/arrow/go/v15/arrow/float16" "github.com/apache/arrow/go/v15/arrow/memory" "github.com/apache/arrow/go/v15/parquet" "github.com/apache/arrow/go/v15/parquet/metadata" @@ -32,24 +33,36 @@ import ( // NOTE(zeroshade): tests will be added and updated after merging the "file" package // since the tests that I wrote relied on the file writer/reader for ease of use. +func newFloat16Node(name string, rep parquet.Repetition, fieldID int32) *schema.PrimitiveNode { + return schema.MustPrimitive(schema.NewPrimitiveNodeLogical(name, rep, schema.Float16LogicalType{}, parquet.Types.FixedLenByteArray, 2, fieldID)) +} + func TestCheckNaNs(t *testing.T) { const ( numvals = 8 min = -4.0 max = 3.0 ) - nan := math.NaN() + var ( + nan = math.NaN() + f16Min parquet.FixedLenByteArray = float16.New(float32(min)).ToLEBytes() + f16Max parquet.FixedLenByteArray = float16.New(float32(max)).ToLEBytes() + ) allNans := []float64{nan, nan, nan, nan, nan, nan, nan, nan} allNansf32 := make([]float32, numvals) + allNansf16 := make([]parquet.FixedLenByteArray, numvals) for idx, v := range allNans { allNansf32[idx] = float32(v) + allNansf16[idx] = float16.New(float32(v)).ToLEBytes() } someNans := []float64{nan, max, -3.0, -1.0, nan, 2.0, min, nan} someNansf32 := make([]float32, numvals) + someNansf16 := make([]parquet.FixedLenByteArray, numvals) for idx, v := range someNans { someNansf32[idx] = float32(v) + someNansf16[idx] = float16.New(float32(v)).ToLEBytes() } validBitmap := []byte{0x7F} // 0b01111111 @@ -62,6 +75,8 @@ func TestCheckNaNs(t *testing.T) { s.Update(values.([]float32), 0) case *metadata.Float64Statistics: s.Update(values.([]float64), 0) + case *metadata.Float16Statistics: + s.Update(values.([]parquet.FixedLenByteArray), 0) } assert.False(t, stats.HasMinMax()) } else { @@ -72,6 +87,8 @@ func TestCheckNaNs(t *testing.T) { s.UpdateSpaced(values.([]float32), bitmap, 0, int64(nullCount)) case *metadata.Float64Statistics: s.UpdateSpaced(values.([]float64), bitmap, 0, int64(nullCount)) + case *metadata.Float16Statistics: + s.UpdateSpaced(values.([]parquet.FixedLenByteArray), bitmap, 0, int64(nullCount)) } assert.False(t, stats.HasMinMax()) } @@ -89,6 +106,11 @@ func TestCheckNaNs(t *testing.T) { assert.True(t, stats.HasMinMax()) assert.Equal(t, expectedMin, s.Min()) assert.Equal(t, expectedMax, s.Max()) + case *metadata.Float16Statistics: + s.Update(values.([]parquet.FixedLenByteArray), 0) + assert.True(t, stats.HasMinMax()) + assert.Equal(t, expectedMin, s.Min()) + assert.Equal(t, expectedMax, s.Max()) } } @@ -106,34 +128,48 @@ func TestCheckNaNs(t *testing.T) { assert.True(t, s.HasMinMax()) assert.Equal(t, expectedMin, s.Min()) assert.Equal(t, expectedMax, s.Max()) + case *metadata.Float16Statistics: + s.UpdateSpaced(values.([]parquet.FixedLenByteArray), bitmap, 0, int64(nullCount)) + assert.True(t, s.HasMinMax()) + assert.Equal(t, expectedMin, s.Min()) + assert.Equal(t, expectedMax, s.Max()) } } f32Col := schema.NewColumn(schema.NewFloat32Node("f", parquet.Repetitions.Optional, -1), 1, 1) f64Col := schema.NewColumn(schema.NewFloat64Node("f", parquet.Repetitions.Optional, -1), 1, 1) + f16Col := schema.NewColumn(newFloat16Node("f", parquet.Repetitions.Required, -1), 1, 1) // test values someNanStats := metadata.NewStatistics(f64Col, memory.DefaultAllocator) someNanStatsf32 := metadata.NewStatistics(f32Col, memory.DefaultAllocator) + someNanStatsf16 := metadata.NewStatistics(f16Col, memory.DefaultAllocator) // ingesting only nans should not yield a min or max assertUnsetMinMax(someNanStats, allNans, nil) assertUnsetMinMax(someNanStatsf32, allNansf32, nil) + assertUnsetMinMax(someNanStatsf16, allNansf16, nil) // ingesting a mix should yield a valid min/max assertMinMaxAre(someNanStats, someNans, min, max) assertMinMaxAre(someNanStatsf32, someNansf32, float32(min), float32(max)) + assertMinMaxAre(someNanStatsf16, someNansf16, f16Min, f16Max) // ingesting only nans after a valid min/max should have no effect assertMinMaxAre(someNanStats, allNans, min, max) assertMinMaxAre(someNanStatsf32, allNansf32, float32(min), float32(max)) + assertMinMaxAre(someNanStatsf16, allNansf16, f16Min, f16Max) someNanStats = metadata.NewStatistics(f64Col, memory.DefaultAllocator) someNanStatsf32 = metadata.NewStatistics(f32Col, memory.DefaultAllocator) + someNanStatsf16 = metadata.NewStatistics(f16Col, memory.DefaultAllocator) assertUnsetMinMax(someNanStats, allNans, validBitmap) assertUnsetMinMax(someNanStatsf32, allNansf32, validBitmap) + assertUnsetMinMax(someNanStatsf16, allNansf16, validBitmap) // nans should not pollute min/max when excluded via null bitmap assertMinMaxAreSpaced(someNanStats, someNans, validBitmapNoNaNs, min, max) assertMinMaxAreSpaced(someNanStatsf32, someNansf32, validBitmapNoNaNs, float32(min), float32(max)) + assertMinMaxAreSpaced(someNanStatsf16, someNansf16, validBitmapNoNaNs, f16Min, f16Max) // ingesting nans with a null bitmap should not change the result assertMinMaxAreSpaced(someNanStats, someNans, validBitmap, min, max) assertMinMaxAreSpaced(someNanStatsf32, someNansf32, validBitmap, float32(min), float32(max)) + assertMinMaxAreSpaced(someNanStatsf16, someNansf16, validBitmap, f16Min, f16Max) } func TestCheckNegativeZeroStats(t *testing.T) { @@ -155,37 +191,61 @@ func TestCheckNegativeZeroStats(t *testing.T) { assert.True(t, math.Signbit(s.Min())) assert.Equal(t, zero, s.Max()) assert.False(t, math.Signbit(s.Max())) + case *metadata.Float16Statistics: + s.Update(values.([]parquet.FixedLenByteArray), 0) + assert.True(t, s.HasMinMax()) + var zero float64 + min := float64(float16.FromLEBytes(s.Min()).Float32()) + max := float64(float16.FromLEBytes(s.Max()).Float32()) + assert.Equal(t, zero, min) + assert.True(t, math.Signbit(min)) + assert.Equal(t, zero, max) + assert.False(t, math.Signbit(max)) } } fcol := schema.NewColumn(schema.NewFloat32Node("f", parquet.Repetitions.Optional, -1), 1, 1) dcol := schema.NewColumn(schema.NewFloat64Node("d", parquet.Repetitions.Optional, -1), 1, 1) + hcol := schema.NewColumn(newFloat16Node("h", parquet.Repetitions.Optional, -1), 1, 1) var f32zero float32 var f64zero float64 + var f16PosZero parquet.FixedLenByteArray = float16.New(+f32zero).ToLEBytes() + var f16NegZero parquet.FixedLenByteArray = float16.New(-f32zero).ToLEBytes() + + assert.False(t, float16.FromLEBytes(f16PosZero).Signbit()) + assert.True(t, float16.FromLEBytes(f16NegZero).Signbit()) { fstats := metadata.NewStatistics(fcol, memory.DefaultAllocator) dstats := metadata.NewStatistics(dcol, memory.DefaultAllocator) + hstats := metadata.NewStatistics(hcol, memory.DefaultAllocator) assertMinMaxZeroesSign(fstats, []float32{-f32zero, f32zero}) assertMinMaxZeroesSign(dstats, []float64{-f64zero, f64zero}) + assertMinMaxZeroesSign(hstats, []parquet.FixedLenByteArray{f16NegZero, f16PosZero}) } { fstats := metadata.NewStatistics(fcol, memory.DefaultAllocator) dstats := metadata.NewStatistics(dcol, memory.DefaultAllocator) + hstats := metadata.NewStatistics(hcol, memory.DefaultAllocator) assertMinMaxZeroesSign(fstats, []float32{f32zero, -f32zero}) assertMinMaxZeroesSign(dstats, []float64{f64zero, -f64zero}) + assertMinMaxZeroesSign(hstats, []parquet.FixedLenByteArray{f16PosZero, f16NegZero}) } { fstats := metadata.NewStatistics(fcol, memory.DefaultAllocator) dstats := metadata.NewStatistics(dcol, memory.DefaultAllocator) + hstats := metadata.NewStatistics(hcol, memory.DefaultAllocator) assertMinMaxZeroesSign(fstats, []float32{-f32zero, -f32zero}) assertMinMaxZeroesSign(dstats, []float64{-f64zero, -f64zero}) + assertMinMaxZeroesSign(hstats, []parquet.FixedLenByteArray{f16NegZero, f16NegZero}) } { fstats := metadata.NewStatistics(fcol, memory.DefaultAllocator) dstats := metadata.NewStatistics(dcol, memory.DefaultAllocator) + hstats := metadata.NewStatistics(hcol, memory.DefaultAllocator) assertMinMaxZeroesSign(fstats, []float32{f32zero, f32zero}) assertMinMaxZeroesSign(dstats, []float64{f64zero, f64zero}) + assertMinMaxZeroesSign(hstats, []parquet.FixedLenByteArray{f16PosZero, f16PosZero}) } } diff --git a/go/parquet/metadata/statistics_types.gen.go b/go/parquet/metadata/statistics_types.gen.go index e8fb9877c8444..baecd185d14fc 100644 --- a/go/parquet/metadata/statistics_types.gen.go +++ b/go/parquet/metadata/statistics_types.gen.go @@ -24,6 +24,7 @@ import ( "github.com/apache/arrow/go/v15/arrow" "github.com/apache/arrow/go/v15/arrow/array" + "github.com/apache/arrow/go/v15/arrow/float16" "github.com/apache/arrow/go/v15/arrow/memory" "github.com/apache/arrow/go/v15/internal/bitutils" shared_utils "github.com/apache/arrow/go/v15/internal/utils" @@ -2432,6 +2433,314 @@ func (s *FixedLenByteArrayStatistics) Encode() (enc EncodedStatistics, err error return } +type minmaxPairFloat16 [2]parquet.FixedLenByteArray + +// Float16Statistics is the typed interface for managing stats for a column +// of Float16 type. +type Float16Statistics struct { + statistics + min parquet.FixedLenByteArray + max parquet.FixedLenByteArray + + bitSetReader bitutils.SetBitRunReader +} + +// NewFloat16Statistics constructs an appropriate stat object type using the +// given column descriptor and allocator. +// +// Panics if the physical type of descr is not parquet.Type.FixedLenByteArray +// Panics if the logical type of descr is not schema.Float16LogicalType +func NewFloat16Statistics(descr *schema.Column, mem memory.Allocator) *Float16Statistics { + if descr.PhysicalType() != parquet.Types.FixedLenByteArray { + panic(fmt.Errorf("parquet: invalid type %s for constructing a Float16 stat object", descr.PhysicalType())) + } + if !descr.LogicalType().Equals(schema.Float16LogicalType{}) { + panic(fmt.Errorf("parquet: invalid logical type %s for constructing a Float16 stat object", descr.LogicalType().String())) + } + + return &Float16Statistics{ + statistics: statistics{ + descr: descr, + hasNullCount: true, + hasDistinctCount: true, + order: descr.SortOrder(), + encoder: encoding.NewEncoder(descr.PhysicalType(), parquet.Encodings.Plain, false, descr, mem), + mem: mem, + }, + } +} + +// NewFloat16StatisticsFromEncoded will construct a propertly typed statistics object +// initializing it with the provided information. +func NewFloat16StatisticsFromEncoded(descr *schema.Column, mem memory.Allocator, nvalues int64, encoded StatProvider) *Float16Statistics { + ret := NewFloat16Statistics(descr, mem) + ret.nvalues += nvalues + if encoded.IsSetNullCount() { + ret.IncNulls(encoded.GetNullCount()) + } + if encoded.IsSetDistinctCount() { + ret.IncDistinct(encoded.GetDistinctCount()) + } + + encodedMin := encoded.GetMin() + if encodedMin != nil && len(encodedMin) > 0 { + ret.min = ret.plainDecode(encodedMin) + } + encodedMax := encoded.GetMax() + if encodedMax != nil && len(encodedMax) > 0 { + ret.max = ret.plainDecode(encodedMax) + } + ret.hasMinMax = encoded.IsSetMax() || encoded.IsSetMin() + return ret +} + +func (s *Float16Statistics) plainEncode(src parquet.FixedLenByteArray) []byte { + s.encoder.(encoding.FixedLenByteArrayEncoder).Put([]parquet.FixedLenByteArray{src}) + buf, err := s.encoder.FlushValues() + if err != nil { + panic(err) // recovered by Encode + } + defer buf.Release() + + out := make([]byte, buf.Len()) + copy(out, buf.Bytes()) + return out +} + +func (s *Float16Statistics) plainDecode(src []byte) parquet.FixedLenByteArray { + var buf [1]parquet.FixedLenByteArray + + decoder := encoding.NewDecoder(s.descr.PhysicalType(), parquet.Encodings.Plain, s.descr, s.mem) + decoder.SetData(1, src) + decoder.(encoding.FixedLenByteArrayDecoder).Decode(buf[:]) + return buf[0] +} + +func (s *Float16Statistics) minval(a, b parquet.FixedLenByteArray) parquet.FixedLenByteArray { + switch { + case a == nil: + return b + case b == nil: + return a + case s.less(a, b): + return a + default: + return b + } +} + +func (s *Float16Statistics) maxval(a, b parquet.FixedLenByteArray) parquet.FixedLenByteArray { + switch { + case a == nil: + return b + case b == nil: + return a + case s.less(a, b): + return b + default: + return a + } +} + +// MinMaxEqual returns true if both stat objects have the same Min and Max values +func (s *Float16Statistics) MinMaxEqual(rhs *Float16Statistics) bool { + return s.equal(s.min, rhs.min) && s.equal(s.max, rhs.max) +} + +// Equals returns true only if both objects are the same type, have the same min and +// max values, null count, distinct count and number of values. +func (s *Float16Statistics) Equals(other TypedStatistics) bool { + if s.Type() != other.Type() || !s.descr.LogicalType().Equals(other.Descr().LogicalType()) { + return false + } + rhs, ok := other.(*Float16Statistics) + if !ok { + return false + } + + if s.HasMinMax() != rhs.HasMinMax() { + return false + } + return (s.hasMinMax && s.MinMaxEqual(rhs)) && + s.NullCount() == rhs.NullCount() && + s.DistinctCount() == rhs.DistinctCount() && + s.NumValues() == rhs.NumValues() +} + +func (s *Float16Statistics) coalesce(val, fallback parquet.FixedLenByteArray) parquet.FixedLenByteArray { + if float16.FromLEBytes(val).IsNaN() { + return fallback + } + return val +} + +func (s *Float16Statistics) getMinMax(values []parquet.FixedLenByteArray) (min, max parquet.FixedLenByteArray) { + defMin := s.defaultMin() + defMax := s.defaultMax() + + min = defMin + max = defMax + + for _, v := range values { + min = s.minval(min, s.coalesce(v, defMin)) + max = s.maxval(max, s.coalesce(v, defMax)) + } + return +} + +func (s *Float16Statistics) getMinMaxSpaced(values []parquet.FixedLenByteArray, validBits []byte, validBitsOffset int64) (min, max parquet.FixedLenByteArray) { + min = s.defaultMin() + max = s.defaultMax() + + if s.bitSetReader == nil { + s.bitSetReader = bitutils.NewSetBitRunReader(validBits, validBitsOffset, int64(len(values))) + } else { + s.bitSetReader.Reset(validBits, validBitsOffset, int64(len(values))) + } + + for { + run := s.bitSetReader.NextRun() + if run.Length == 0 { + break + } + for _, v := range values[int(run.Pos):int(run.Pos+run.Length)] { + min = s.minval(min, coalesce(v, s.defaultMin()).(parquet.FixedLenByteArray)) + max = s.maxval(max, coalesce(v, s.defaultMax()).(parquet.FixedLenByteArray)) + } + } + return +} + +func (s *Float16Statistics) Min() parquet.FixedLenByteArray { return s.min } +func (s *Float16Statistics) Max() parquet.FixedLenByteArray { return s.max } + +// Merge merges the stats from other into this stat object, updating +// the null count, distinct count, number of values and the min/max if +// appropriate. +func (s *Float16Statistics) Merge(other TypedStatistics) { + rhs, ok := other.(*Float16Statistics) + if !ok { + panic("incompatible stat type merge") + } + + s.statistics.merge(rhs) + if rhs.HasMinMax() { + s.SetMinMax(rhs.Min(), rhs.Max()) + } +} + +// Update is used to add more values to the current stat object, finding the +// min and max values etc. +func (s *Float16Statistics) Update(values []parquet.FixedLenByteArray, numNull int64) { + s.IncNulls(numNull) + s.nvalues += int64(len(values)) + + if len(values) == 0 { + return + } + + s.SetMinMax(s.getMinMax(values)) +} + +// UpdateSpaced is just like Update, but for spaced values using validBits to determine +// and skip null values. +func (s *Float16Statistics) UpdateSpaced(values []parquet.FixedLenByteArray, validBits []byte, validBitsOffset, numNull int64) { + s.IncNulls(numNull) + notnull := int64(len(values)) - numNull + s.nvalues += notnull + + if notnull == 0 { + return + } + + s.SetMinMax(s.getMinMaxSpaced(values, validBits, validBitsOffset)) +} + +func (s *Float16Statistics) UpdateFromArrow(values arrow.Array, updateCounts bool) error { + if updateCounts { + s.IncNulls(int64(values.NullN())) + s.nvalues += int64(values.Len() - values.NullN()) + } + + if values.NullN() == values.Len() { + return nil + } + + return fmt.Errorf("%w: update float16 stats from Arrow", arrow.ErrNotImplemented) +} + +// SetMinMax updates the min and max values only if they are not currently set +// or if argMin is less than the current min / argMax is greater than the current max +func (s *Float16Statistics) SetMinMax(argMin, argMax parquet.FixedLenByteArray) { + maybeMinMax := s.cleanStat([2]parquet.FixedLenByteArray{argMin, argMax}) + if maybeMinMax == nil { + return + } + + min := (*maybeMinMax)[0] + max := (*maybeMinMax)[1] + + if !s.hasMinMax { + s.hasMinMax = true + s.min = min + s.max = max + } else { + if !s.less(s.min, min) { + s.min = min + } + if s.less(s.max, max) { + s.max = max + } + } +} + +// EncodeMin returns the encoded min value with plain encoding. +// +// ByteArray stats do not include the length in the encoding. +func (s *Float16Statistics) EncodeMin() []byte { + if s.HasMinMax() { + return s.plainEncode(s.min) + } + return nil +} + +// EncodeMax returns the current encoded max value with plain encoding +// +// ByteArray stats do not include the length in the encoding +func (s *Float16Statistics) EncodeMax() []byte { + if s.HasMinMax() { + return s.plainEncode(s.max) + } + return nil +} + +// Encode returns a populated EncodedStatistics object +func (s *Float16Statistics) Encode() (enc EncodedStatistics, err error) { + defer func() { + if r := recover(); r != nil { + switch r := r.(type) { + case error: + err = r + case string: + err = xerrors.New(r) + default: + err = fmt.Errorf("unknown error type thrown from panic: %v", r) + } + } + }() + if s.HasMinMax() { + enc.SetMax(s.EncodeMax()) + enc.SetMin(s.EncodeMin()) + } + if s.HasNullCount() { + enc.SetNullCount(s.NullCount()) + } + if s.HasDistinctCount() { + enc.SetDistinctCount(s.DistinctCount()) + } + return +} + // NewStatistics uses the type in the column descriptor to construct the appropriate // typed stats object. If mem is nil, then memory.DefaultAllocator will be used. func NewStatistics(descr *schema.Column, mem memory.Allocator) TypedStatistics { @@ -2454,6 +2763,9 @@ func NewStatistics(descr *schema.Column, mem memory.Allocator) TypedStatistics { case parquet.Types.ByteArray: return NewByteArrayStatistics(descr, mem) case parquet.Types.FixedLenByteArray: + if descr.LogicalType().Equals(schema.Float16LogicalType{}) { + return NewFloat16Statistics(descr, mem) + } return NewFixedLenByteArrayStatistics(descr, mem) default: panic("not implemented") @@ -2484,6 +2796,9 @@ func NewStatisticsFromEncoded(descr *schema.Column, mem memory.Allocator, nvalue case parquet.Types.ByteArray: return NewByteArrayStatisticsFromEncoded(descr, mem, nvalues, encoded) case parquet.Types.FixedLenByteArray: + if descr.LogicalType().Equals(schema.Float16LogicalType{}) { + return NewFloat16StatisticsFromEncoded(descr, mem, nvalues, encoded) + } return NewFixedLenByteArrayStatisticsFromEncoded(descr, mem, nvalues, encoded) default: panic("not implemented") diff --git a/go/parquet/metadata/statistics_types.gen.go.tmpl b/go/parquet/metadata/statistics_types.gen.go.tmpl index 4b6253a8574ea..93495527c7e54 100644 --- a/go/parquet/metadata/statistics_types.gen.go.tmpl +++ b/go/parquet/metadata/statistics_types.gen.go.tmpl @@ -45,10 +45,18 @@ type {{.Name}}Statistics struct { // given column descriptor and allocator. // // Panics if the physical type of descr is not parquet.Type.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}} +{{- if eq .Name "Float16"}} +// Panics if the logical type of descr is not schema.Float16LogicalType +{{- end}} func New{{.Name}}Statistics(descr *schema.Column, mem memory.Allocator) *{{.Name}}Statistics { if descr.PhysicalType() != parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}} { panic(fmt.Errorf("parquet: invalid type %s for constructing a {{.Name}} stat object", descr.PhysicalType())) } +{{- if eq .Name "Float16"}} + if !descr.LogicalType().Equals(schema.Float16LogicalType{}) { + panic(fmt.Errorf("parquet: invalid logical type %s for constructing a {{.Name}} stat object", descr.LogicalType().String())) + } +{{- end}} return &{{.Name}}Statistics{ statistics: statistics{ @@ -96,7 +104,7 @@ func (s *{{.Name}}Statistics) plainEncode(src {{.name}}) []byte { copy(out, src) return out {{- else}} - s.encoder.(encoding.{{.Name}}Encoder).Put([]{{.name}}{src}) + s.encoder.(encoding.{{if .logical}}{{.physical}}{{else}}{{.Name}}{{end}}Encoder).Put([]{{.name}}{src}) buf, err := s.encoder.FlushValues() if err != nil { panic(err) // recovered by Encode @@ -117,12 +125,12 @@ func (s *{{.Name}}Statistics) plainDecode(src []byte) {{.name}} { decoder := encoding.NewDecoder(s.descr.PhysicalType(), parquet.Encodings.Plain, s.descr, s.mem) decoder.SetData(1, src) - decoder.(encoding.{{.Name}}Decoder).Decode(buf[:]) + decoder.(encoding.{{if .logical}}{{.physical}}{{else}}{{.Name}}{{end}}Decoder).Decode(buf[:]) return buf[0] {{- end}} } -{{if and (ne .Name "ByteArray") (ne .Name "FixedLenByteArray")}} +{{if and (ne .Name "ByteArray") (ne .Name "FixedLenByteArray") (ne .Name "Float16")}} func (s *{{.Name}}Statistics) minval(a, b {{.name}}) {{.name}} { if s.less(a, b) { return a @@ -172,7 +180,11 @@ func (s *{{.Name}}Statistics) MinMaxEqual(rhs *{{.Name}}Statistics) bool { // Equals returns true only if both objects are the same type, have the same min and // max values, null count, distinct count and number of values. func (s *{{.Name}}Statistics) Equals(other TypedStatistics) bool { +{{- if .logical}} + if s.Type() != other.Type() || !s.descr.LogicalType().Equals(other.Descr().LogicalType()) { +{{- else}} if s.Type() != other.Type() { +{{- end}} return false } rhs, ok := other.(*{{.Name}}Statistics) @@ -194,6 +206,13 @@ func (s *{{.Name}}Statistics) coalesce(val, fallback {{.name}}) {{.name}} { } return val } +{{else if eq .Name "Float16"}} +func (s *{{.Name}}Statistics) coalesce(val, fallback {{.name}}) {{.name}} { + if float16.FromLEBytes(val).IsNaN() { + return fallback + } + return val +} {{end}} func (s *{{.Name}}Statistics) getMinMax(values []{{.name}}) (min, max {{.name}}) { @@ -212,7 +231,7 @@ func (s *{{.Name}}Statistics) getMinMax(values []{{.name}}) (min, max {{.name}}) max = defMax for _, v := range values { -{{- if or (eq .name "float32") (eq .name "float64") }} +{{- if or (eq .name "float32") (eq .name "float64") (eq .Name "Float16") }} min = s.minval(min, s.coalesce(v, defMin)) max = s.maxval(max, s.coalesce(v, defMax)) {{- else}} @@ -261,7 +280,7 @@ func (s *{{.Name}}Statistics) getMinMaxSpaced(values []{{.name}}, validBits []by } {{- else}} for _, v := range values[int(run.Pos):int(run.Pos+run.Length)] { -{{- if or (eq .name "float32") (eq .name "float64") }} +{{- if or (eq .name "float32") (eq .name "float64") (eq .Name "Float16") }} min = s.minval(min, coalesce(v, s.defaultMin()).({{.name}})) max = s.maxval(max, coalesce(v, s.defaultMax()).({{.name}})) {{- else}} @@ -381,7 +400,9 @@ func (s *{{.Name}}Statistics) UpdateFromArrow(values arrow.Array, updateCounts b s.SetMinMax(min, max) return nil {{else if eq .Name "Boolean"}} - return fmt.Errorf("%w: update boolean stats from Arrow", arrow.ErrNotImplemented) + return fmt.Errorf("%w: update boolean stats from Arrow", arrow.ErrNotImplemented) +{{else if eq .Name "Float16"}} + return fmt.Errorf("%w: update float16 stats from Arrow", arrow.ErrNotImplemented) {{else}} if values.DataType().(arrow.FixedWidthDataType).Bytes() != arrow.{{.Name}}SizeBytes { return fmt.Errorf("%w: cannot update {{.name}} stats with %s arrow array", @@ -475,8 +496,15 @@ func NewStatistics(descr *schema.Column, mem memory.Allocator) TypedStatistics { } switch descr.PhysicalType() { {{- range .In}} + {{- if not .logical}} case parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}}: + {{- if eq .Name "FixedLenByteArray"}} + if descr.LogicalType().Equals(schema.Float16LogicalType{}) { + return NewFloat16Statistics(descr, mem) + } + {{- end}} return New{{.Name}}Statistics(descr, mem) + {{- end}} {{- end}} default: panic("not implemented") @@ -493,8 +521,15 @@ func NewStatisticsFromEncoded(descr *schema.Column, mem memory.Allocator, nvalue } switch descr.PhysicalType() { {{- range .In}} + {{- if not .logical}} case parquet.Types.{{if .physical}}{{.physical}}{{else}}{{.Name}}{{end}}: + {{- if eq .Name "FixedLenByteArray"}} + if descr.LogicalType().Equals(schema.Float16LogicalType{}) { + return NewFloat16StatisticsFromEncoded(descr, mem, nvalues, encoded) + } + {{- end}} return New{{.Name}}StatisticsFromEncoded(descr, mem, nvalues, encoded) + {{- end}} {{- end}} default: panic("not implemented") diff --git a/go/parquet/metadata/statistics_types.tmpldata b/go/parquet/metadata/statistics_types.tmpldata new file mode 100644 index 0000000000000..400c0a3ca515d --- /dev/null +++ b/go/parquet/metadata/statistics_types.tmpldata @@ -0,0 +1,60 @@ +[ + { + "Name": "Int32", + "name": "int32", + "lower": "int32", + "prefix": "arrow" + }, + { + "Name": "Int64", + "name": "int64", + "lower": "int64", + "prefix": "arrow" + }, + { + "Name": "Int96", + "name": "parquet.Int96", + "lower": "int96", + "prefix": "parquet" + }, + { + "Name": "Float32", + "name": "float32", + "lower": "float32", + "prefix": "arrow", + "physical": "Float" + }, + { + "Name": "Float64", + "name": "float64", + "lower": "float64", + "prefix": "arrow", + "physical": "Double" + }, + { + "Name": "Boolean", + "name": "bool", + "lower": "bool", + "prefix": "arrow" + }, + { + "Name": "ByteArray", + "name": "parquet.ByteArray", + "lower": "byteArray", + "prefix": "parquet" + }, + { + "Name": "FixedLenByteArray", + "name": "parquet.FixedLenByteArray", + "lower": "fixedLenByteArray", + "prefix": "parquet" + }, + { + "Name": "Float16", + "name": "parquet.FixedLenByteArray", + "lower": "float16", + "prefix": "parquet", + "physical": "FixedLenByteArray", + "logical": "Float16LogicalType" + } +] diff --git a/go/parquet/pqarrow/column_readers.go b/go/parquet/pqarrow/column_readers.go index 02f94c941c3d9..49f3fac0a3b7c 100644 --- a/go/parquet/pqarrow/column_readers.go +++ b/go/parquet/pqarrow/column_readers.go @@ -517,6 +517,14 @@ func transferColumnData(rdr file.RecordReader, valueType arrow.DataType, descr * default: return nil, errors.New("time unit not supported") } + case arrow.FLOAT16: + if descr.PhysicalType() != parquet.Types.FixedLenByteArray { + return nil, errors.New("physical type for float16 must be fixed len byte array") + } + if len := arrow.Float16SizeBytes; descr.TypeLength() != len { + return nil, fmt.Errorf("fixed len byte array length for float16 must be %d", len) + } + return transferBinary(rdr, valueType), nil default: return nil, fmt.Errorf("no support for reading columns of type: %s", valueType.Name()) } @@ -563,6 +571,14 @@ func transferBinary(rdr file.RecordReader, dt arrow.DataType) *arrow.Chunked { chunks[idx] = array.MakeFromData(chunk.Data()) chunk.Release() } + case *arrow.Float16Type: + for idx, chunk := range chunks { + data := chunk.Data() + f16_data := array.NewData(dt, data.Len(), data.Buffers(), nil, data.NullN(), data.Offset()) + defer f16_data.Release() + chunks[idx] = array.NewFloat16Data(f16_data) + chunk.Release() + } } return arrow.NewChunked(dt, chunks) } diff --git a/go/parquet/pqarrow/encode_arrow.go b/go/parquet/pqarrow/encode_arrow.go index 81b4527b105cb..1855d3625adb7 100644 --- a/go/parquet/pqarrow/encode_arrow.go +++ b/go/parquet/pqarrow/encode_arrow.go @@ -582,6 +582,31 @@ func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnChunkWriter, leafArr } wr.WriteBatchSpaced(data, defLevels, repLevels, arr.NullBitmapBytes(), int64(arr.Data().Offset())) } + case *arrow.Float16Type: + typeLen := wr.Descr().TypeLength() + if typeLen != arrow.Float16SizeBytes { + return fmt.Errorf("%w: invalid FixedLenByteArray length to write from float16 column: %d", arrow.ErrInvalid, typeLen) + } + + arr := leafArr.(*array.Float16) + rawValues := arrow.Float16Traits.CastToBytes(arr.Values()) + data := make([]parquet.FixedLenByteArray, arr.Len()) + + if arr.NullN() == 0 { + for idx := range data { + offset := idx * typeLen + data[idx] = rawValues[offset : offset+typeLen] + } + _, err = wr.WriteBatch(data, defLevels, repLevels) + } else { + for idx := range data { + if arr.IsValid(idx) { + offset := idx * typeLen + data[idx] = rawValues[offset : offset+typeLen] + } + } + wr.WriteBatchSpaced(data, defLevels, repLevels, arr.NullBitmapBytes(), int64(arr.Data().Offset())) + } default: return fmt.Errorf("%w: invalid column type to write to FixedLenByteArray: %s", arrow.ErrInvalid, leafArr.DataType().Name()) } diff --git a/go/parquet/pqarrow/encode_arrow_test.go b/go/parquet/pqarrow/encode_arrow_test.go index 281ca0d526476..d588aff701f3d 100644 --- a/go/parquet/pqarrow/encode_arrow_test.go +++ b/go/parquet/pqarrow/encode_arrow_test.go @@ -495,6 +495,8 @@ func getLogicalType(typ arrow.DataType) schema.LogicalType { return schema.DateLogicalType{} case arrow.DATE64: return schema.DateLogicalType{} + case arrow.FLOAT16: + return schema.Float16LogicalType{} case arrow.TIMESTAMP: ts := typ.(*arrow.TimestampType) adjustedUTC := len(ts.TimeZone) == 0 @@ -541,6 +543,8 @@ func getPhysicalType(typ arrow.DataType) parquet.Type { return parquet.Types.Float case arrow.FLOAT64: return parquet.Types.Double + case arrow.FLOAT16: + return parquet.Types.FixedLenByteArray case arrow.BINARY, arrow.LARGE_BINARY, arrow.STRING, arrow.LARGE_STRING: return parquet.Types.ByteArray case arrow.FIXED_SIZE_BINARY, arrow.DECIMAL: @@ -600,6 +604,8 @@ func (ps *ParquetIOTestSuite) makeSimpleSchema(typ arrow.DataType, rep parquet.R byteWidth = int32(typ.ByteWidth) case arrow.DecimalType: byteWidth = pqarrow.DecimalSize(typ.GetPrecision()) + case *arrow.Float16Type: + byteWidth = int32(typ.Bytes()) case *arrow.DictionaryType: valuesType := typ.ValueType switch dt := valuesType.(type) { @@ -607,6 +613,8 @@ func (ps *ParquetIOTestSuite) makeSimpleSchema(typ arrow.DataType, rep parquet.R byteWidth = int32(dt.ByteWidth) case arrow.DecimalType: byteWidth = pqarrow.DecimalSize(dt.GetPrecision()) + case *arrow.Float16Type: + byteWidth = int32(typ.Bytes()) } } @@ -1113,6 +1121,7 @@ var fullTypeList = []arrow.DataType{ arrow.FixedWidthTypes.Date32, arrow.PrimitiveTypes.Float32, arrow.PrimitiveTypes.Float64, + arrow.FixedWidthTypes.Float16, arrow.BinaryTypes.String, arrow.BinaryTypes.Binary, &arrow.FixedSizeBinaryType{ByteWidth: 10}, diff --git a/go/parquet/pqarrow/schema.go b/go/parquet/pqarrow/schema.go index b23c37ea39102..95c477c78b87d 100644 --- a/go/parquet/pqarrow/schema.go +++ b/go/parquet/pqarrow/schema.go @@ -344,6 +344,10 @@ func fieldToNode(name string, field arrow.Field, props *parquet.WriterProperties } else { logicalType = schema.NewTimeLogicalType(true, schema.TimeUnitMicros) } + case arrow.FLOAT16: + typ = parquet.Types.FixedLenByteArray + length = arrow.Float16SizeBytes + logicalType = schema.Float16LogicalType{} case arrow.STRUCT: return structToNode(field.Type.(*arrow.StructType), field.Name, field.Nullable, props, arrprops) case arrow.FIXED_SIZE_LIST, arrow.LIST: @@ -597,6 +601,8 @@ func arrowFromFLBA(logical schema.LogicalType, length int) (arrow.DataType, erro return arrowDecimal(logtype), nil case schema.NoLogicalType, schema.IntervalLogicalType, schema.UUIDLogicalType: return &arrow.FixedSizeBinaryType{ByteWidth: int(length)}, nil + case schema.Float16LogicalType: + return &arrow.Float16Type{}, nil default: return nil, xerrors.New("unhandled logical type " + logical.String() + " for fixed-length byte array") } diff --git a/go/parquet/pqarrow/schema_test.go b/go/parquet/pqarrow/schema_test.go index b5e7dc8fad34a..ee5aad8913470 100644 --- a/go/parquet/pqarrow/schema_test.go +++ b/go/parquet/pqarrow/schema_test.go @@ -280,6 +280,25 @@ func TestConvertArrowDecimals(t *testing.T) { } } +func TestConvertArrowFloat16(t *testing.T) { + parquetFields := make(schema.FieldList, 0) + arrowFields := make([]arrow.Field, 0) + + parquetFields = append(parquetFields, schema.Must(schema.NewPrimitiveNodeLogical("float16", parquet.Repetitions.Required, + schema.Float16LogicalType{}, parquet.Types.FixedLenByteArray, 2, -1))) + arrowFields = append(arrowFields, arrow.Field{Name: "float16", Type: &arrow.Float16Type{}}) + + arrowSchema := arrow.NewSchema(arrowFields, nil) + parquetSchema := schema.NewSchema(schema.MustGroup(schema.NewGroupNode("schema", parquet.Repetitions.Repeated, parquetFields, -1))) + + result, err := pqarrow.ToParquet(arrowSchema, nil, pqarrow.NewArrowWriterProperties(pqarrow.WithDeprecatedInt96Timestamps(true))) + assert.NoError(t, err) + assert.True(t, parquetSchema.Equals(result)) + for i := 0; i < parquetSchema.NumColumns(); i++ { + assert.Truef(t, parquetSchema.Column(i).Equals(result.Column(i)), "Column %d didn't match: %s", i, parquetSchema.Column(i).Name()) + } +} + func TestCoerceTImestampV1(t *testing.T) { parquetFields := make(schema.FieldList, 0) arrowFields := make([]arrow.Field, 0) @@ -418,7 +437,6 @@ func TestUnsupportedTypes(t *testing.T) { typ arrow.DataType }{ // Non-exhaustive list of unsupported types - {typ: &arrow.Float16Type{}}, {typ: &arrow.DurationType{}}, {typ: &arrow.DayTimeIntervalType{}}, {typ: &arrow.MonthIntervalType{}}, diff --git a/go/parquet/schema/logical_types.go b/go/parquet/schema/logical_types.go index 1ea44fc56c615..69e69363887cd 100644 --- a/go/parquet/schema/logical_types.go +++ b/go/parquet/schema/logical_types.go @@ -68,6 +68,8 @@ func getLogicalType(l *format.LogicalType) LogicalType { return BSONLogicalType{} case l.IsSetUUID(): return UUIDLogicalType{} + case l.IsSetFLOAT16(): + return Float16LogicalType{} case l == nil: return NoLogicalType{} default: @@ -1064,6 +1066,50 @@ func (IntervalLogicalType) Equals(rhs LogicalType) bool { return ok } +// Float16LogicalType can only be used with a FixedLength byte array column +// that is exactly 2 bytes long +type Float16LogicalType struct{ baseLogicalType } + +func (Float16LogicalType) SortOrder() SortOrder { + return SortSIGNED +} + +func (Float16LogicalType) MarshalJSON() ([]byte, error) { + return json.Marshal(map[string]string{"Type": Float16LogicalType{}.String()}) +} + +func (Float16LogicalType) String() string { + return "Float16" +} + +func (Float16LogicalType) ToConvertedType() (ConvertedType, DecimalMetadata) { + return ConvertedTypes.None, DecimalMetadata{} +} + +func (Float16LogicalType) IsCompatible(c ConvertedType, dec DecimalMetadata) bool { + if dec.IsSet { + return false + } + switch c { + case ConvertedTypes.None, ConvertedTypes.NA: + return true + } + return false +} + +func (Float16LogicalType) IsApplicable(t parquet.Type, tlen int32) bool { + return t == parquet.Types.FixedLenByteArray && tlen == 2 +} + +func (Float16LogicalType) toThrift() *format.LogicalType { + return &format.LogicalType{FLOAT16: format.NewFloat16Type()} +} + +func (Float16LogicalType) Equals(rhs LogicalType) bool { + _, ok := rhs.(Float16LogicalType) + return ok +} + type NullLogicalType struct{ baseLogicalType } func (NullLogicalType) SortOrder() SortOrder { diff --git a/go/parquet/schema/logical_types_test.go b/go/parquet/schema/logical_types_test.go index 49edf1748c52c..c371b47714f41 100644 --- a/go/parquet/schema/logical_types_test.go +++ b/go/parquet/schema/logical_types_test.go @@ -158,6 +158,7 @@ func TestNewTypeIncompatibility(t *testing.T) { expected schema.LogicalType }{ {"uuid", schema.UUIDLogicalType{}, schema.UUIDLogicalType{}}, + {"float16", schema.Float16LogicalType{}, schema.Float16LogicalType{}}, {"null", schema.NullLogicalType{}, schema.NullLogicalType{}}, {"not-utc-time_milli", schema.NewTimeLogicalType(false /* adjutedToUTC */, schema.TimeUnitMillis), &schema.TimeLogicalType{}}, {"not-utc-time-micro", schema.NewTimeLogicalType(false /* adjutedToUTC */, schema.TimeUnitMicros), &schema.TimeLogicalType{}}, @@ -224,6 +225,7 @@ func TestLogicalTypeProperties(t *testing.T) { {"json", schema.JSONLogicalType{}, false, true, true}, {"bson", schema.BSONLogicalType{}, false, true, true}, {"uuid", schema.UUIDLogicalType{}, false, true, true}, + {"float16", schema.Float16LogicalType{}, false, true, true}, {"nological", schema.NoLogicalType{}, false, false, true}, {"unknown", schema.UnknownLogicalType{}, false, false, false}, } @@ -358,6 +360,14 @@ func TestLogicalInapplicableTypes(t *testing.T) { assert.False(t, logical.IsApplicable(tt.typ, tt.len)) }) } + + logical = schema.Float16LogicalType{} + assert.True(t, logical.IsApplicable(parquet.Types.FixedLenByteArray, 2)) + for _, tt := range tests { + t.Run("float16 "+tt.name, func(t *testing.T) { + assert.False(t, logical.IsApplicable(tt.typ, tt.len)) + }) + } } func TestDecimalLogicalTypeApplicability(t *testing.T) { @@ -445,6 +455,7 @@ func TestLogicalTypeRepresentation(t *testing.T) { {"json", schema.JSONLogicalType{}, "JSON", `{"Type": "JSON"}`}, {"bson", schema.BSONLogicalType{}, "BSON", `{"Type": "BSON"}`}, {"uuid", schema.UUIDLogicalType{}, "UUID", `{"Type": "UUID"}`}, + {"float16", schema.Float16LogicalType{}, "Float16", `{"Type": "Float16"}`}, {"none", schema.NoLogicalType{}, "None", `{"Type": "None"}`}, } @@ -490,6 +501,7 @@ func TestLogicalTypeSortOrder(t *testing.T) { {"json", schema.JSONLogicalType{}, schema.SortUNSIGNED}, {"bson", schema.BSONLogicalType{}, schema.SortUNSIGNED}, {"uuid", schema.UUIDLogicalType{}, schema.SortUNSIGNED}, + {"float16", schema.Float16LogicalType{}, schema.SortSIGNED}, {"none", schema.NoLogicalType{}, schema.SortUNKNOWN}, } diff --git a/go/parquet/schema/reflection.go b/go/parquet/schema/reflection.go index d79edb92408f8..c0c8e0533efb0 100644 --- a/go/parquet/schema/reflection.go +++ b/go/parquet/schema/reflection.go @@ -22,6 +22,7 @@ import ( "strconv" "strings" + "github.com/apache/arrow/go/v15/arrow/float16" "github.com/apache/arrow/go/v15/parquet" format "github.com/apache/arrow/go/v15/parquet/internal/gen-go/parquet" "golang.org/x/xerrors" @@ -159,6 +160,8 @@ func (t *taggedInfo) UpdateLogicalTypes() { return BSONLogicalType{} case "uuid": return UUIDLogicalType{} + case "float16": + return Float16LogicalType{} default: panic(fmt.Errorf("invalid logical type specified: %s", t)) } @@ -373,6 +376,9 @@ func typeToNode(name string, typ reflect.Type, repType parquet.Repetition, info } return Must(MapOf(name, key, value, repType, fieldID)) case reflect.Struct: + if typ == reflect.TypeOf(float16.Num{}) { + return MustPrimitive(NewPrimitiveNodeLogical(name, repType, Float16LogicalType{}, parquet.Types.FixedLenByteArray, 2, fieldID)) + } // structs are Group nodes fields := make(FieldList, 0) for i := 0; i < typ.NumField(); i++ { diff --git a/go/parquet/schema/reflection_test.go b/go/parquet/schema/reflection_test.go index 06ad7191a5749..e3a880cacc1e8 100644 --- a/go/parquet/schema/reflection_test.go +++ b/go/parquet/schema/reflection_test.go @@ -22,6 +22,7 @@ import ( "reflect" "testing" + "github.com/apache/arrow/go/v15/arrow/float16" "github.com/apache/arrow/go/v15/parquet" "github.com/apache/arrow/go/v15/parquet/schema" "github.com/stretchr/testify/assert" @@ -152,6 +153,9 @@ func ExampleNewSchemaFromStruct_logicaltypes() { JSON string `parquet:"logical=json"` BSON []byte `parquet:"logical=BSON"` UUID [16]byte `parquet:"logical=uuid"` + Float16 [2]byte `parquet:"logical=float16"` + Float16Optional *[2]byte `parquet:"logical=float16"` + Float16Num float16.Num } sc, err := schema.NewSchemaFromStruct(LogicalTypes{}) @@ -180,6 +184,9 @@ func ExampleNewSchemaFromStruct_logicaltypes() { // required byte_array field_id=-1 JSON (JSON); // required byte_array field_id=-1 BSON (BSON); // required fixed_len_byte_array field_id=-1 UUID (UUID); + // required fixed_len_byte_array field_id=-1 Float16 (Float16); + // optional fixed_len_byte_array field_id=-1 Float16Optional (Float16); + // required fixed_len_byte_array field_id=-1 Float16Num (Float16); // } } diff --git a/go/parquet/schema/schema_element_test.go b/go/parquet/schema/schema_element_test.go index dd1d293e5cfd7..d190ffe5a253a 100644 --- a/go/parquet/schema/schema_element_test.go +++ b/go/parquet/schema/schema_element_test.go @@ -159,6 +159,10 @@ func (s *SchemaElementConstructionSuite) TestSimple() { "uuid", UUIDLogicalType{}, parquet.Types.FixedLenByteArray, 16, false, ConvertedTypes.NA, true, func(e *format.SchemaElement) bool { return e.LogicalType.IsSetUUID() }, }, nil}, + {"float16", &schemaElementConstructArgs{ + "float16", Float16LogicalType{}, parquet.Types.FixedLenByteArray, 2, false, ConvertedTypes.NA, true, + func(e *format.SchemaElement) bool { return e.LogicalType.IsSetFLOAT16() }, + }, nil}, {"none", &schemaElementConstructArgs{ "none", NoLogicalType{}, parquet.Types.Int64, -1, false, ConvertedTypes.NA, false, checkNone, @@ -425,7 +429,8 @@ func TestSchemaElementNestedSerialization(t *testing.T) { timestampNode := MustPrimitive(NewPrimitiveNodeLogical("timestamp" /*name */, parquet.Repetitions.Required, NewTimestampLogicalType(false /* adjustedToUTC */, TimeUnitNanos), parquet.Types.Int64, -1 /* type len */, -1 /* fieldID */)) intNode := MustPrimitive(NewPrimitiveNodeLogical("int" /*name */, parquet.Repetitions.Required, NewIntLogicalType(64 /* bitWidth */, false /* signed */), parquet.Types.Int64, -1 /* type len */, -1 /* fieldID */)) decimalNode := MustPrimitive(NewPrimitiveNodeLogical("decimal" /*name */, parquet.Repetitions.Required, NewDecimalLogicalType(16 /* precision */, 6 /* scale */), parquet.Types.Int64, -1 /* type len */, -1 /* fieldID */)) - listNode := MustGroup(NewGroupNodeLogical("list" /*name */, parquet.Repetitions.Repeated, []Node{strNode, dateNode, jsonNode, uuidNode, timestampNode, intNode, decimalNode}, NewListLogicalType(), -1 /* fieldID */)) + float16Node := MustPrimitive(NewPrimitiveNodeLogical("float16" /*name */, parquet.Repetitions.Required, Float16LogicalType{}, parquet.Types.FixedLenByteArray, 2 /* type len */, - /* fieldID */ 1)) + listNode := MustGroup(NewGroupNodeLogical("list" /*name */, parquet.Repetitions.Repeated, []Node{strNode, dateNode, jsonNode, uuidNode, timestampNode, intNode, decimalNode, float16Node}, NewListLogicalType(), -1 /* fieldID */)) listElems := ToThrift(listNode) assert.Equal(t, "list", listElems[0].Name) @@ -440,6 +445,7 @@ func TestSchemaElementNestedSerialization(t *testing.T) { assert.True(t, listElems[5].LogicalType.IsSetTIMESTAMP()) assert.True(t, listElems[6].LogicalType.IsSetINTEGER()) assert.True(t, listElems[7].LogicalType.IsSetDECIMAL()) + assert.True(t, listElems[8].LogicalType.IsSetFLOAT16()) mapNode := MustGroup(NewGroupNodeLogical("map" /* name */, parquet.Repetitions.Required, []Node{}, MapLogicalType{}, -1 /* fieldID */)) mapElems := ToThrift(mapNode) @@ -486,6 +492,7 @@ func TestLogicalTypeSerializationRoundTrip(t *testing.T) { {"json", JSONLogicalType{}, parquet.Types.ByteArray, -1}, {"bson", BSONLogicalType{}, parquet.Types.ByteArray, -1}, {"uuid", UUIDLogicalType{}, parquet.Types.FixedLenByteArray, 16}, + {"float16", Float16LogicalType{}, parquet.Types.FixedLenByteArray, 2}, {"none", NoLogicalType{}, parquet.Types.Boolean, -1}, } diff --git a/go/parquet/schema/schema_test.go b/go/parquet/schema/schema_test.go index b60c7dfaafcd1..cc43c3856d68e 100644 --- a/go/parquet/schema/schema_test.go +++ b/go/parquet/schema/schema_test.go @@ -635,6 +635,10 @@ func TestPanicSchemaNodeCreation(t *testing.T) { schema.MustPrimitive(schema.NewPrimitiveNodeLogical("uuid" /* name */, parquet.Repetitions.Required, schema.UUIDLogicalType{}, parquet.Types.FixedLenByteArray, 64 /* type len */, -1 /* fieldID */)) }, "incompatible primitive length") + assert.Panics(t, func() { + schema.MustPrimitive(schema.NewPrimitiveNodeLogical("float16" /* name */, parquet.Repetitions.Required, schema.Float16LogicalType{}, parquet.Types.FixedLenByteArray, 4 /* type len */, -1 /* fieldID */)) + }, "incompatible primitive length") + assert.Panics(t, func() { schema.MustPrimitive(schema.NewPrimitiveNodeLogical("negative_len" /* name */, parquet.Repetitions.Required, schema.NoLogicalType{}, parquet.Types.FixedLenByteArray, -16 /* type len */, -1 /* fieldID */)) }, "non-positive length for fixed length binary")