This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new 7d9f3812 feat: Support setting IPC options in FlightSQL call options
(#674)
7d9f3812 is described below
commit 7d9f38123934afb7c92567f06078dbecc3d75fad
Author: William <[email protected]>
AuthorDate: Fri Feb 27 10:00:04 2026 -0800
feat: Support setting IPC options in FlightSQL call options (#674)
### Rationale for this change
I would like to specify the IPC stream compression settings for the
FlightSQL `ExecuteIngest` command. Currently, there is no way to apply
IPC options to the writer stream from the `ExecuteIngest` method.
### What changes are included in this PR?
* Introduces a `ExecuteIngestWithIPC` which allows passing IPC options
* Retains the existing behaviour of `ExecuteIngest`
* Consolidates the shared execute ingest behaviour into a private
`executeIngest`
### Are these changes tested?
* Tested with new unit tests to validate the record batch frames are
sent with LZ4 compression when the `ipc.WithLZ4()` option is passed in
the call options
### Are there any user-facing changes?
* Yes - introduces a new public method `ExecuteIngestWithIPC`
---
arrow/flight/flightsql/client.go | 17 +++-
arrow/flight/flightsql/client_test.go | 145 ++++++++++++++++++++++++++++++++++
2 files changed, 161 insertions(+), 1 deletion(-)
diff --git a/arrow/flight/flightsql/client.go b/arrow/flight/flightsql/client.go
index 2610da39..192cec7b 100644
--- a/arrow/flight/flightsql/client.go
+++ b/arrow/flight/flightsql/client.go
@@ -247,6 +247,18 @@ func (c *Client) ExecuteSubstraitUpdate(ctx
context.Context, plan SubstraitPlan,
// The provided RecordReader will be retained for the duration of the call,
but it is the caller's
// responsibility to release the original reference.
func (c *Client) ExecuteIngest(ctx context.Context, rdr array.RecordReader,
reqOptions *ExecuteIngestOpts, opts ...grpc.CallOption) (int64, error) {
+ return c.executeIngest(ctx, rdr, reqOptions, nil, opts...)
+}
+
+// ExecuteIngestWithIPC is like ExecuteIngest, and also allows configuring IPC
+// stream writer options such as compression.
+// The provided RecordReader will be retained for the duration of the call,
but it is the caller's
+// responsibility to release the original reference.
+func (c *Client) ExecuteIngestWithIPC(ctx context.Context, rdr
array.RecordReader, reqOptions *ExecuteIngestOpts, ipcOpts []ipc.Option, opts
...grpc.CallOption) (int64, error) {
+ return c.executeIngest(ctx, rdr, reqOptions, ipcOpts, opts...)
+}
+
+func (c *Client) executeIngest(ctx context.Context, rdr array.RecordReader,
reqOptions *ExecuteIngestOpts, ipcOpts []ipc.Option, opts ...grpc.CallOption)
(int64, error) {
var (
err error
desc *flight.FlightDescriptor
@@ -274,7 +286,10 @@ func (c *Client) ExecuteIngest(ctx context.Context, rdr
array.RecordReader, reqO
return 0, err
}
- wr = flight.NewRecordWriter(stream, ipc.WithAllocator(c.Alloc),
ipc.WithSchema(rdr.Schema()))
+ writerOpts := make([]ipc.Option, 0, 2+len(ipcOpts))
+ writerOpts = append(writerOpts, ipc.WithAllocator(c.Alloc),
ipc.WithSchema(rdr.Schema()))
+ writerOpts = append(writerOpts, ipcOpts...)
+ wr = flight.NewRecordWriter(stream, writerOpts...)
defer wr.Close()
wr.SetFlightDescriptor(desc)
diff --git a/arrow/flight/flightsql/client_test.go
b/arrow/flight/flightsql/client_test.go
index a8ba228b..858526ea 100644
--- a/arrow/flight/flightsql/client_test.go
+++ b/arrow/flight/flightsql/client_test.go
@@ -27,7 +27,10 @@ import (
"github.com/apache/arrow-go/v18/arrow/flight"
"github.com/apache/arrow-go/v18/arrow/flight/flightsql"
pb "github.com/apache/arrow-go/v18/arrow/flight/gen/flight"
+ "github.com/apache/arrow-go/v18/arrow/internal/flatbuf"
+ "github.com/apache/arrow-go/v18/arrow/ipc"
"github.com/apache/arrow-go/v18/arrow/memory"
+ flatbuffers "github.com/google/flatbuffers/go"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc"
@@ -163,6 +166,7 @@ func getAction(cmd proto.Message) *flight.Action {
func (s *FlightSqlClientSuite) SetupTest() {
s.mockClient = FlightServiceClientMock{}
s.sqlClient.Client = &s.mockClient
+ s.sqlClient.Alloc = memory.DefaultAllocator
s.callOpts = []grpc.CallOption{grpc.EmptyCallOption{}}
}
@@ -649,6 +653,147 @@ func (s *FlightSqlClientSuite) TestExecuteUpdate() {
s.EqualValues(100, num)
}
+func (s *FlightSqlClientSuite) TestExecuteIngestWithIPCOptions() {
+ schema := arrow.NewSchema([]arrow.Field{{Name: "id", Type:
arrow.PrimitiveTypes.Int64}}, nil)
+ rec, _, err := array.RecordFromJSON(memory.DefaultAllocator, schema,
strings.NewReader(`[{"id": 1}]`))
+ s.Require().NoError(err)
+ defer rec.Release()
+
+ rdr, err := array.NewRecordReader(schema, []arrow.RecordBatch{rec})
+ s.Require().NoError(err)
+ defer rdr.Release()
+
+ request := &flightsql.ExecuteIngestOpts{
+ Table: "target_table",
+ TableDefinitionOptions: &flightsql.TableDefinitionOptions{},
+ }
+
+ result := &pb.DoPutUpdateResult{RecordCount: 1}
+ resdata, _ := proto.Marshal(result)
+
+ mockedPut := &mockDoPutClient{}
+ defer mockedPut.AssertExpectations(s.T())
+
+ var sent []*flight.FlightData
+ mockedPut.On("Send",
mock.AnythingOfType("*flight.FlightData")).Run(func(args mock.Arguments) {
+ sent = append(sent,
proto.Clone(args.Get(0).(*flight.FlightData)).(*flight.FlightData))
+ }).Return(nil)
+ mockedPut.On("CloseSend").Return(nil)
+ mockedPut.On("Recv").Return(&pb.PutResult{AppMetadata: resdata},
nil).Once()
+ mockedPut.On("Recv").Return(&pb.PutResult{}, io.EOF).Once()
+
+ s.mockClient.On("DoPut", s.callOpts).Return(mockedPut, nil)
+
+ count, err := s.sqlClient.ExecuteIngestWithIPC(
+ context.Background(),
+ rdr,
+ request,
+ []ipc.Option{ipc.WithLZ4()},
+ s.callOpts...,
+ )
+ s.Require().NoError(err)
+ s.EqualValues(1, count)
+
+ var rbCompression *flatbuf.BodyCompression
+ for _, fd := range sent {
+ if len(fd.DataHeader) == 0 {
+ continue
+ }
+
+ msg := flatbuf.GetRootAsMessage(fd.DataHeader, 0)
+ if msg.HeaderType() != flatbuf.MessageHeaderRecordBatch {
+ continue
+ }
+
+ var header flatbuffers.Table
+ if !msg.Header(&header) {
+ continue
+ }
+
+ var batch flatbuf.RecordBatch
+ batch.Init(header.Bytes, header.Pos)
+ rbCompression = batch.Compression(nil)
+ break
+ }
+
+ if s.NotNil(rbCompression, "record batch should include compression
metadata") {
+ s.Equal(flatbuf.CompressionTypeLZ4_FRAME, rbCompression.Codec())
+ }
+}
+
+func (s *FlightSqlClientSuite) TestExecuteIngestWithSchemaOverrideOption() {
+ dataSchema := arrow.NewSchema([]arrow.Field{{Name: "id", Type:
arrow.PrimitiveTypes.Int64}}, nil)
+ overrideSchema := arrow.NewSchema([]arrow.Field{{Name: "name", Type:
arrow.BinaryTypes.String}}, nil)
+
+ rec, _, err := array.RecordFromJSON(memory.DefaultAllocator,
dataSchema, strings.NewReader(`[{"id": 1}]`))
+ s.Require().NoError(err)
+ defer rec.Release()
+
+ rdr, err := array.NewRecordReader(dataSchema, []arrow.RecordBatch{rec})
+ s.Require().NoError(err)
+ defer rdr.Release()
+
+ request := &flightsql.ExecuteIngestOpts{
+ Table: "target_table",
+ TableDefinitionOptions: &flightsql.TableDefinitionOptions{},
+ }
+
+ mockedPut := &mockDoPutClient{}
+ defer mockedPut.AssertExpectations(s.T())
+ mockedPut.On("Send",
mock.AnythingOfType("*flight.FlightData")).Return(nil)
+
+ s.mockClient.On("DoPut", s.callOpts).Return(mockedPut, nil)
+
+ _, err = s.sqlClient.ExecuteIngestWithIPC(
+ context.Background(),
+ rdr,
+ request,
+ []ipc.Option{ipc.WithSchema(overrideSchema)},
+ s.callOpts...,
+ )
+ s.Error(err)
+ s.ErrorContains(err, "different schema")
+}
+
+func (s *FlightSqlClientSuite) TestExecuteIngestWithSliceOptions() {
+ schema := arrow.NewSchema([]arrow.Field{{Name: "id", Type:
arrow.PrimitiveTypes.Int64}}, nil)
+ rec, _, err := array.RecordFromJSON(memory.DefaultAllocator, schema,
strings.NewReader(`[{"id": 1}]`))
+ s.Require().NoError(err)
+ defer rec.Release()
+
+ rdr, err := array.NewRecordReader(schema, []arrow.RecordBatch{rec})
+ s.Require().NoError(err)
+ defer rdr.Release()
+
+ request := &flightsql.ExecuteIngestOpts{
+ Table: "target_table",
+ TableDefinitionOptions: &flightsql.TableDefinitionOptions{},
+ }
+
+ result := &pb.DoPutUpdateResult{RecordCount: 1}
+ resdata, _ := proto.Marshal(result)
+
+ mockedPut := &mockDoPutClient{}
+ defer mockedPut.AssertExpectations(s.T())
+ mockedPut.On("Send",
mock.AnythingOfType("*flight.FlightData")).Return(nil)
+ mockedPut.On("CloseSend").Return(nil)
+ mockedPut.On("Recv").Return(&pb.PutResult{AppMetadata: resdata},
nil).Once()
+ mockedPut.On("Recv").Return(&pb.PutResult{}, io.EOF).Once()
+
+ s.mockClient.On("DoPut", s.callOpts).Return(mockedPut, nil)
+
+ ipcOpts := []ipc.Option{ipc.WithLZ4()}
+ count, err := s.sqlClient.ExecuteIngestWithIPC(
+ context.Background(),
+ rdr,
+ request,
+ ipcOpts,
+ s.callOpts...,
+ )
+ s.Require().NoError(err)
+ s.EqualValues(1, count)
+}
+
func (s *FlightSqlClientSuite) TestGetSqlInfo() {
sqlInfo := []flightsql.SqlInfo{
flightsql.SqlInfoFlightSqlServerName,