This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new 64e07d94 fix(arrow/cdata): fix leaks identified by leak-sanitizer
(#603)
64e07d94 is described below
commit 64e07d94c8b11a35cec99a7b3325f83952d9fbea
Author: Matt Topol <[email protected]>
AuthorDate: Tue Dec 9 11:08:18 2025 -0500
fix(arrow/cdata): fix leaks identified by leak-sanitizer (#603)
### Rationale for this change
fixes #597
### What changes are included in this PR?
A series of calls to release, runtime.GC, free and so on to address all
the leaks that were identified by the Go 1.25 sanitizer
### Are these changes tested?
go1.25 is added to the workflows to run the tests with ASAN which will
catch these sanitizer issues in the future
### Are there any user-facing changes?
only the fixes
---
.github/workflows/test.yml | 13 +++++++++++++
arrow/cdata/cdata.go | 1 +
arrow/cdata/cdata_exports.go | 10 +++++-----
arrow/cdata/cdata_fulltest.c | 5 +++++
arrow/cdata/cdata_test.go | 22 +++++++++++++++++++++-
arrow/cdata/cdata_test_framework.go | 9 +++++++++
arrow/cdata/exports.go | 28 +++++++++++++++++-----------
arrow/memory/mallocator/mallocator.go | 9 ++++++++-
8 files changed, 79 insertions(+), 18 deletions(-)
diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index 5f6529fa..f896fc2a 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -46,6 +46,14 @@ jobs:
arch: arm64v8
go: 1.24
runs-on: ubuntu-24.04-arm
+ - arch-label: AMD64
+ arch: amd64
+ go: 1.25
+ runs-on: ubuntu-latest
+ - arch-label: ARM64
+ arch: arm64v8
+ go: 1.25
+ runs-on: ubuntu-24.04-arm
env:
ARCH: ${{ matrix.arch }}
GO: ${{ matrix.go }}
@@ -81,6 +89,7 @@ jobs:
matrix:
go:
- '1.24'
+ - '1.25'
env:
GO: ${{ matrix.go }}
steps:
@@ -120,6 +129,7 @@ jobs:
matrix:
go:
- '1.24'
+ - '1.25'
env:
GO: ${{ matrix.go }}
steps:
@@ -159,6 +169,7 @@ jobs:
matrix:
go:
- '1.24'
+ - '1.25'
steps:
- name: Checkout
uses: actions/checkout@c2d88d3ecc89a9ef08eebf45d9637801dcee7eb5 #
v5.0.1
@@ -185,6 +196,7 @@ jobs:
matrix:
go:
- '1.24'
+ - '1.25'
env:
ARROW_GO_TESTCGO: "1"
steps:
@@ -218,6 +230,7 @@ jobs:
matrix:
go:
- '1.24'
+ - '1.25'
steps:
- name: Checkout
uses: actions/checkout@c2d88d3ecc89a9ef08eebf45d9637801dcee7eb5 #
v5.0.1
diff --git a/arrow/cdata/cdata.go b/arrow/cdata/cdata.go
index f006e494..4085ed3d 100644
--- a/arrow/cdata/cdata.go
+++ b/arrow/cdata/cdata.go
@@ -937,6 +937,7 @@ func initReader(rdr *nativeCRecordBatchReader, stream
*CArrowArrayStream) error
return rdr.getError(int(errno))
}
defer C.ArrowSchemaRelease(&sc)
+
s, err := ImportCArrowSchema((*CArrowSchema)(&sc))
if err != nil {
return err
diff --git a/arrow/cdata/cdata_exports.go b/arrow/cdata/cdata_exports.go
index 9020ddf1..1e8e9bf9 100644
--- a/arrow/cdata/cdata_exports.go
+++ b/arrow/cdata/cdata_exports.go
@@ -459,7 +459,7 @@ type cRecordReader struct {
err *C.char
}
-func (rr cRecordReader) getSchema(out *CArrowSchema) int {
+func (rr *cRecordReader) getSchema(out *CArrowSchema) int {
schema := rr.rdr.Schema()
if schema == nil {
return rr.maybeError()
@@ -468,7 +468,7 @@ func (rr cRecordReader) getSchema(out *CArrowSchema) int {
return 0
}
-func (rr cRecordReader) next(out *CArrowArray) int {
+func (rr *cRecordReader) next(out *CArrowArray) int {
if rr.rdr.Next() {
ExportArrowRecordBatch(rr.rdr.RecordBatch(), out, nil)
return 0
@@ -477,7 +477,7 @@ func (rr cRecordReader) next(out *CArrowArray) int {
return rr.maybeError()
}
-func (rr cRecordReader) maybeError() int {
+func (rr *cRecordReader) maybeError() int {
err := rr.rdr.Err()
if err != nil {
return C.EIO
@@ -485,7 +485,7 @@ func (rr cRecordReader) maybeError() int {
return 0
}
-func (rr cRecordReader) getLastError() *C.char {
+func (rr *cRecordReader) getLastError() *C.char {
err := rr.rdr.Err()
if err != nil {
if rr.err != nil {
@@ -496,7 +496,7 @@ func (rr cRecordReader) getLastError() *C.char {
return rr.err
}
-func (rr cRecordReader) release() {
+func (rr *cRecordReader) release() {
if rr.err != nil {
C.free(unsafe.Pointer(rr.err))
}
diff --git a/arrow/cdata/cdata_fulltest.c b/arrow/cdata/cdata_fulltest.c
index b4d94132..d9e67e93 100644
--- a/arrow/cdata/cdata_fulltest.c
+++ b/arrow/cdata/cdata_fulltest.c
@@ -118,6 +118,8 @@ static void release_nested_internal(struct ArrowSchema*
schema,
if (is_dynamic) {
free((void*)schema->format);
free((void*)schema->name);
+ } else {
+ free(schema->children);
}
ArrowSchemaMarkReleased(schema);
}
@@ -161,6 +163,7 @@ void test_primitive(struct ArrowSchema* schema, const char*
fmt) {
// Since test_lists et al. allocate an entirely array of ArrowSchema pointers,
// need to expose a function to free it.
void free_malloced_schemas(struct ArrowSchema** schemas) {
+ free(schemas[0]);
free(schemas);
}
@@ -325,6 +328,7 @@ static void release_stream(struct ArrowArrayStream* st) {
static void release_the_array(struct ArrowArray* out) {
for (int i = 0; i < out->n_children; ++i) {
ArrowArrayRelease(out->children[i]);
+ free(out->children[i]);
}
free((void*)out->children);
free(out->buffers);
@@ -454,6 +458,7 @@ int test_exported_stream(struct ArrowArrayStream* stream) {
stream->release(stream);
break;
}
+ array.release(&array);
}
return 0;
}
diff --git a/arrow/cdata/cdata_test.go b/arrow/cdata/cdata_test.go
index 169ed848..170a5151 100644
--- a/arrow/cdata/cdata_test.go
+++ b/arrow/cdata/cdata_test.go
@@ -50,6 +50,7 @@ func TestSchemaExport(t *testing.T) {
sc := exportInt32TypeSchema()
f, err := importSchema(&sc)
assert.NoError(t, err)
+ defer ReleaseCArrowSchema(&sc)
keys, _ := getMetadataKeys()
vals, _ := getMetadataValues()
@@ -66,6 +67,7 @@ func TestSimpleArrayExport(t *testing.T) {
assert.False(t, test1IsReleased())
testarr := exportInt32Array()
+ defer freeCArray(testarr)
arr, err := ImportCArrayWithType(testarr, arrow.PrimitiveTypes.Int32)
assert.NoError(t, err)
@@ -79,7 +81,9 @@ func TestSimpleArrayExport(t *testing.T) {
func TestSimpleArrayAndSchema(t *testing.T) {
sc := exportInt32TypeSchema()
+ defer ReleaseCArrowSchema(&sc)
testarr := exportInt32Array()
+ defer freeCArray(testarr)
// grab address of the buffer we stuck into the ArrowArray object
buflist := (*[2]unsafe.Pointer)(unsafe.Pointer(testarr.buffers))
@@ -89,6 +93,7 @@ func TestSimpleArrayAndSchema(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, arrow.PrimitiveTypes.Int32, fld.Type)
assert.EqualValues(t, 10, arr.Len())
+ defer arr.Release()
// verify that the address is the same of the first integer for the
// slice that is being used by the arrow.Array and the original buffer
@@ -782,6 +787,7 @@ func TestRecordBatch(t *testing.T) {
func TestRecordReaderStream(t *testing.T) {
stream := arrayStreamTest()
+ defer releaseStreamTest(stream)
defer ReleaseCArrowArrayStream(stream)
rdr := ImportCArrayStream(stream, nil)
@@ -806,6 +812,7 @@ func TestRecordReaderStream(t *testing.T) {
assert.Equal(t, "bar", rec.Column(1).(*array.String).Value(1))
assert.Equal(t, "baz", rec.Column(1).(*array.String).Value(2))
}
+ runtime.GC()
}
func TestExportRecordReaderStream(t *testing.T) {
@@ -813,6 +820,7 @@ func TestExportRecordReaderStream(t *testing.T) {
rdr, _ := array.NewRecordReader(reclist[0].Schema(), reclist)
out := createTestStreamObj()
+ defer releaseStreamTest(out)
ExportRecordReader(rdr, out)
assert.NotNil(t, out.get_schema)
@@ -822,7 +830,7 @@ func TestExportRecordReaderStream(t *testing.T) {
assert.NotNil(t, out.private_data)
h := *(*cgo.Handle)(out.private_data)
- assert.Same(t, rdr, h.Value().(cRecordReader).rdr)
+ assert.Same(t, rdr, h.Value().(*cRecordReader).rdr)
importedRdr := ImportCArrayStream(out, nil)
i := 0
@@ -862,6 +870,7 @@ func TestExportRecordReaderStreamLifetime(t *testing.T) {
defer rdr.Release()
out := createTestStreamObj()
+ defer releaseStreamTest(out)
ExportRecordReader(rdr, out)
// C Stream is holding on to memory
@@ -878,6 +887,7 @@ func TestEmptyListExport(t *testing.T) {
var out CArrowArray
ExportArrowArray(arr, &out, nil)
+ defer ReleaseCArrowArray(&out)
assert.Zero(t, out.length)
assert.Zero(t, out.null_count)
@@ -898,6 +908,8 @@ func TestEmptyDictExport(t *testing.T) {
var out CArrowArray
var sc CArrowSchema
ExportArrowArray(arr, &out, &sc)
+ defer ReleaseCArrowArray(&out)
+ defer ReleaseCArrowSchema(&sc)
assert.EqualValues(t, 'c', *sc.format)
assert.NotZero(t, sc.flags&1)
@@ -933,6 +945,8 @@ func TestEmptyStringExport(t *testing.T) {
var out CArrowArray
var sc CArrowSchema
ExportArrowArray(arr, &out, &sc)
+ defer ReleaseCArrowArray(&out)
+ defer ReleaseCArrowSchema(&sc)
assert.EqualValues(t, 'u', *sc.format)
assert.Zero(t, sc.n_children)
@@ -958,6 +972,8 @@ func TestEmptyUnionExport(t *testing.T) {
var out CArrowArray
var sc CArrowSchema
ExportArrowArray(arr, &out, &sc)
+ defer ReleaseCArrowArray(&out)
+ defer ReleaseCArrowSchema(&sc)
assert.EqualValues(t, 1, sc.n_children)
assert.Nil(t, sc.dictionary)
@@ -1015,18 +1031,21 @@ func TestRecordReaderError(t *testing.T) {
t.Fatalf("Expected error but got none")
}
assert.Contains(t, err.Error(), "Expected error message")
+ runtime.GC()
err = roundTripStreamTest(&failingReader{opCount: 2})
if err == nil {
t.Fatalf("Expected error but got none")
}
assert.Contains(t, err.Error(), "Expected error message")
+ runtime.GC()
err = roundTripStreamTest(&failingReader{opCount: 3})
if err == nil {
t.Fatalf("Expected error but got none")
}
assert.Contains(t, err.Error(), "Expected error message")
+ runtime.GC()
}
func TestRecordReaderImportError(t *testing.T) {
@@ -1061,6 +1080,7 @@ func TestConfuseGoGc(t *testing.T) {
assert.NoError(t, err)
runtime.GC()
assert.NoError(t, confuseGoGc(rdr))
+ rdr.Release()
runtime.GC()
}
wg.Done()
diff --git a/arrow/cdata/cdata_test_framework.go
b/arrow/cdata/cdata_test_framework.go
index fef18e5d..3e2c5bc8 100644
--- a/arrow/cdata/cdata_test_framework.go
+++ b/arrow/cdata/cdata_test_framework.go
@@ -123,6 +123,10 @@ func exportInt32Array() *CArrowArray {
return arr
}
+func freeCArray(arr *CArrowArray) {
+ C.free(unsafe.Pointer(arr))
+}
+
func isReleased(arr *CArrowArray) bool {
return C.ArrowArrayIsReleased(arr) == 1
}
@@ -408,6 +412,10 @@ func arrayStreamTest() *CArrowArrayStream {
return st
}
+func releaseStreamTest(st *CArrowArrayStream) {
+ C.free(unsafe.Pointer(st))
+}
+
func exportedStreamTest(reader array.RecordReader) error {
out := C.get_test_stream()
ExportRecordReader(reader, out)
@@ -421,6 +429,7 @@ func exportedStreamTest(reader array.RecordReader) error {
func roundTripStreamTest(reader array.RecordReader) error {
out := C.get_test_stream()
+ defer C.free(unsafe.Pointer(out))
ExportRecordReader(reader, out)
rdr, err := ImportCRecordReader(out, nil)
diff --git a/arrow/cdata/exports.go b/arrow/cdata/exports.go
index c83b6f82..6367a5b3 100644
--- a/arrow/cdata/exports.go
+++ b/arrow/cdata/exports.go
@@ -84,15 +84,15 @@ func releaseExportedSchema(schema *CArrowSchema) {
C.free(unsafe.Pointer(schema.format))
C.free(unsafe.Pointer(schema.metadata))
- if schema.n_children == 0 {
- return
- }
-
if schema.dictionary != nil {
C.ArrowSchemaRelease(schema.dictionary)
C.free(unsafe.Pointer(schema.dictionary))
}
+ if schema.n_children == 0 {
+ return
+ }
+
children := unsafe.Slice(schema.children, schema.n_children)
for _, c := range children {
C.ArrowSchemaRelease(c)
@@ -153,28 +153,28 @@ func releaseExportedArray(arr *CArrowArray) {
//export streamGetSchema
func streamGetSchema(handle *CArrowArrayStream, out *CArrowSchema) C.int {
h := getHandle(handle.private_data)
- rdr := h.Value().(cRecordReader)
+ rdr := h.Value().(*cRecordReader)
return C.int(rdr.getSchema(out))
}
//export streamGetNext
func streamGetNext(handle *CArrowArrayStream, out *CArrowArray) C.int {
h := getHandle(handle.private_data)
- rdr := h.Value().(cRecordReader)
+ rdr := h.Value().(*cRecordReader)
return C.int(rdr.next(out))
}
//export streamGetError
func streamGetError(handle *CArrowArrayStream) *C.cchar_t {
h := getHandle(handle.private_data)
- rdr := h.Value().(cRecordReader)
+ rdr := h.Value().(*cRecordReader)
return rdr.getLastError()
}
//export streamRelease
func streamRelease(handle *CArrowArrayStream) {
h := getHandle(handle.private_data)
- h.Value().(cRecordReader).release()
+ h.Value().(*cRecordReader).release()
h.Delete()
C.free(unsafe.Pointer(handle.private_data))
handle.release = nil
@@ -187,7 +187,7 @@ func exportStream(rdr array.RecordReader, out
*CArrowArrayStream) {
out.get_last_error = (*[0]byte)(C.streamGetError)
out.release = (*[0]byte)(C.streamRelease)
rdr.Retain()
- h := cgo.NewHandle(cRecordReader{rdr: rdr, err: nil})
+ h := cgo.NewHandle(&cRecordReader{rdr: rdr, err: nil})
out.private_data = createHandle(h)
}
@@ -206,6 +206,9 @@ type taskState struct {
//export asyncStreamOnSchema
func asyncStreamOnSchema(self *CArrowAsyncDeviceStreamHandler, schema
*CArrowSchema) C.int {
h := getHandle(self.private_data)
+ defer C.free(self.private_data)
+ defer h.Delete()
+
handler := h.Value().(cAsyncState)
defer close(handler.ch)
@@ -235,7 +238,6 @@ func asyncStreamOnSchema(self
*CArrowAsyncDeviceStreamHandler, schema *CArrowSch
ctx: handler.ctx,
taskQueue: taskQueue,
}))
- defer h.Delete()
C.goCallRequest(self.producer, C.int64_t(handler.queueSize))
go asyncTaskQueue(handler.ctx, sc, recordStream, taskQueue,
self.producer)
@@ -347,7 +349,10 @@ func exportAsyncProducer(schema *arrow.Schema, stream
<-chan RecordMessage, hand
}()
producer := C.get_producer()
- defer C.free(unsafe.Pointer(producer))
+ defer func() {
+ C.free(producer.private_data)
+ C.free(unsafe.Pointer(producer))
+ }()
producer.device_type = C.ARROW_DEVICE_CPU
producer.request = (*[0]byte)(C.asyncProducerRequest)
@@ -410,6 +415,7 @@ func exportAsyncProducer(schema *arrow.Schema, stream
<-chan RecordMessage, hand
if status != C.int(0) {
msg.Record.Release()
getHandle(task.private_data).Delete()
+ C.free(task.private_data)
return fmt.Errorf("on_next_task failed
with status %d", status)
}
default:
diff --git a/arrow/memory/mallocator/mallocator.go
b/arrow/memory/mallocator/mallocator.go
index 994cfdb2..45959ee8 100644
--- a/arrow/memory/mallocator/mallocator.go
+++ b/arrow/memory/mallocator/mallocator.go
@@ -88,7 +88,14 @@ func (alloc *Mallocator) Allocate(size int) []byte {
buf := unsafe.Slice((*byte)(ptr), paddedSize)
aligned := roundToPowerOf2(uintptr(ptr), uintptr(alloc.alignment))
- alloc.realAllocations.Store(aligned, uintptr(ptr))
+ if size == 0 {
+ // if we return a zero-sized slice, we need to store the actual
ptr
+ // as the key in the map, since the returned slice will have
the actual
+ // pointer value instead of the aligned pointer.
+ alloc.realAllocations.Store(uintptr(ptr), uintptr(ptr))
+ } else {
+ alloc.realAllocations.Store(aligned, uintptr(ptr))
+ }
atomic.AddUint64(&alloc.allocatedBytes, uint64(size))
if uintptr(ptr) != aligned {