Skip to content

Commit

Permalink
ARROW-12517: [Go][Flight] Expose app metadata in flight client and se…
Browse files Browse the repository at this point in the history
…rver

Adding a convenient way to expose the Application Metadata field in the arrow flight client and server for Go.

Closes apache#10142 from zeroshade/arrow-12517

Authored-by: Matthew Topol <[email protected]>
Signed-off-by: Micah Kornfield <[email protected]>
  • Loading branch information
Matthew Topol authored and emkornfield committed Apr 28, 2021
1 parent 8c3363e commit 9218fe4
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 6 deletions.
63 changes: 63 additions & 0 deletions go/arrow/flight/flight_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package flight_test
import (
"context"
"errors"
"fmt"
"io"
"testing"

Expand Down Expand Up @@ -311,3 +312,65 @@ func TestServer(t *testing.T) {
t.Fatalf("got %d, want %d", numRows, fi.TotalRecords)
}
}

type flightMetadataWriterServer struct{}

func (f *flightMetadataWriterServer) DoGet(tkt *flight.Ticket, fs flight.FlightService_DoGetServer) error {
recs := arrdata.Records[string(tkt.GetTicket())]

w := flight.NewRecordWriter(fs, ipc.WithSchema(recs[0].Schema()))
defer w.Close()
for idx, r := range recs {
w.WriteWithAppMetadata(r, []byte(fmt.Sprintf("%d_%s", idx, string(tkt.GetTicket()))) /*metadata*/)
}
return nil
}

func TestFlightWithAppMetadata(t *testing.T) {
f := &flightMetadataWriterServer{}
s := flight.NewFlightServer(nil)
s.RegisterFlightService(&flight.FlightServiceService{DoGet: f.DoGet})
s.Init("localhost:0")

go s.Serve()
defer s.Shutdown()

client, err := flight.NewFlightClient(s.Addr().String(), nil, grpc.WithInsecure())
if err != nil {
t.Fatal(err)
}
defer client.Close()

fdata, err := client.DoGet(context.Background(), &flight.Ticket{Ticket: []byte("primitives")})
if err != nil {
t.Fatal(err)
}

r, err := flight.NewRecordReader(fdata)
if err != nil {
t.Fatal(err)
}

expected := arrdata.Records["primitives"]
idx := 0
for {
rec, err := r.Read()
if err != nil {
if err == io.EOF {
break
}
t.Fatal(err)
}

appMeta := r.LatestAppMetadata()
if !array.RecordEqual(expected[idx], rec) {
t.Errorf("flight data stream records for idx: %d don't match: \ngot = %#v\nwant = %#v", idx, rec, expected[idx])
}

exMeta := fmt.Sprintf("%d_primitives", idx)
if string(appMeta) != exMeta {
t.Errorf("flight data stream application metadata mismatch: got: %v, want: %v\n", string(appMeta), exMeta)
}
idx++
}
}
54 changes: 50 additions & 4 deletions go/arrow/flight/record_batch_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/apache/arrow/go/arrow/internal/debug"
"github.com/apache/arrow/go/arrow/ipc"
"github.com/apache/arrow/go/arrow/memory"
"golang.org/x/xerrors"
)

// DataStreamReader is an interface for receiving flight data messages on a stream
Expand All @@ -37,16 +38,23 @@ type dataMessageReader struct {

refCount int64
msg *ipc.Message
err error

lastAppMetadata []byte
}

func (d *dataMessageReader) Message() (*ipc.Message, error) {
fd, err := d.rdr.Recv()
if err != nil {
// clear the previous message in the error case
d.msg.Release()
d.msg = nil
d.lastAppMetadata = nil
return nil, err
}

return ipc.NewMessage(memory.NewBufferBytes(fd.DataHeader), memory.NewBufferBytes(fd.DataBody)), nil
d.lastAppMetadata = fd.AppMetadata
d.msg = ipc.NewMessage(memory.NewBufferBytes(fd.DataHeader), memory.NewBufferBytes(fd.DataBody))
return d.msg, nil
}

func (d *dataMessageReader) Retain() {
Expand All @@ -60,15 +68,53 @@ func (d *dataMessageReader) Release() {
if d.msg != nil {
d.msg.Release()
d.msg = nil
d.lastAppMetadata = nil
}
}
}

// Reader is an ipc.Reader which also keeps track of the metadata from
// the FlightData messages as they come in, calling LatestAppMetadata
// will return the metadata bytes from the most recently read message.
type Reader struct {
*ipc.Reader
dmr *dataMessageReader
}

// Retain increases the reference count for the underlying message reader
// and ipc.Reader which are utilized by this Reader.
func (r *Reader) Retain() {
r.Reader.Retain()
r.dmr.Retain()
}

// Release reduces the reference count for the underlying message reader
// and ipc.Reader, when the reference counts become zero, the allocated
// memory is released for the stored record and metadata.
func (r *Reader) Release() {
r.Reader.Release()
r.dmr.Release()
}

// LatestAppMetadata returns the bytes from the AppMetadata field of the
// most recently read FlightData message that was processed by calling
// the Next function. The metadata returned would correspond to the record
// retrieved by calling Record().
func (r *Reader) LatestAppMetadata() []byte {
return r.dmr.lastAppMetadata
}

// NewRecordReader constructs an ipc reader using the flight data stream reader
// as the source of the ipc messages, opts passed will be passed to the underlying
// ipc.Reader such as ipc.WithSchema and ipc.WithAllocator
func NewRecordReader(r DataStreamReader, opts ...ipc.Option) (*ipc.Reader, error) {
return ipc.NewReaderFromMessageReader(&dataMessageReader{rdr: r}, opts...)
func NewRecordReader(r DataStreamReader, opts ...ipc.Option) (*Reader, error) {
rdr := &Reader{dmr: &dataMessageReader{rdr: r}}
var err error
if rdr.Reader, err = ipc.NewReaderFromMessageReader(rdr.dmr, opts...); err != nil {
return nil, xerrors.Errorf("arrow/flight: could not create flight reader: %w", err)
}

return rdr, nil
}

// DeserializeSchema takes the schema bytes from FlightInfo or SchemaResult
Expand Down
23 changes: 21 additions & 2 deletions go/arrow/flight/record_batch_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"

"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/array"
"github.com/apache/arrow/go/arrow/ipc"
"github.com/apache/arrow/go/arrow/memory"
)
Expand All @@ -46,17 +47,35 @@ func (f *flightPayloadWriter) WritePayload(payload ipc.Payload) error {

payload.SerializeBody(&f.buf)
f.fd.DataBody = f.buf.Bytes()

return f.w.Send(&f.fd)
}

func (f *flightPayloadWriter) Close() error { return nil }

// Writer is an ipc.Writer which also adds a WriteWithAppMetadata function
// in order to allow adding AppMetadata to the FlightData messages which
// are written.
type Writer struct {
*ipc.Writer
pw *flightPayloadWriter
}

// WriteWithAppMetadata will write this record with the supplied application
// metadata attached in the flightData message.
func (w *Writer) WriteWithAppMetadata(rec array.Record, appMeta []byte) error {
w.pw.fd.AppMetadata = appMeta
defer func() { w.pw.fd.AppMetadata = nil }()
return w.Write(rec)
}

// NewRecordWriter can be used to construct a writer for arrow flight via
// the grpc stream handler to write flight data objects and write
// record batches to the stream. Options passed here will be passed to
// ipc.NewWriter
func NewRecordWriter(w DataStreamWriter, opts ...ipc.Option) *ipc.Writer {
return ipc.NewWriterWithPayloadWriter(&flightPayloadWriter{w: w}, opts...)
func NewRecordWriter(w DataStreamWriter, opts ...ipc.Option) *Writer {
pw := &flightPayloadWriter{w: w}
return &Writer{Writer: ipc.NewWriterWithPayloadWriter(pw, opts...), pw: pw}
}

// SerializeSchema returns the serialized schema bytes for use in Arrow Flight
Expand Down

0 comments on commit 9218fe4

Please sign in to comment.