This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 42043c8fdc GH-48612: [Ruby] Add support for reading streaming format 
(#48613)
42043c8fdc is described below

commit 42043c8fdc935c869a91cc850b19358910a95266
Author: Sutou Kouhei <[email protected]>
AuthorDate: Mon Dec 22 10:17:31 2025 +0900

    GH-48612: [Ruby] Add support for reading streaming format (#48613)
    
    ### Rationale for this change
    
    It's a streaming variant of IPC format.
    
    ### What changes are included in this PR?
    
    * Add `ArrowFormat::MessagePullReader`
    * Add `ArrowFormat::StreamingPullReader`
    * Add `ArrowFormat::StreamingReader`
    * Extract read schema and record batch features as `ArrowFormat::Readable` 
and reuse it in `ArrowFormat::StreamingPullParser` and `ArrowFormat::FileReader`
    
    ### Are these changes tested?
    
    Yes.
    
    We should add more error case tests later.
    
    ### Are there any user-facing changes?
    
    Yes.
    * GitHub Issue: #48612
    
    Authored-by: Sutou Kouhei <[email protected]>
    Signed-off-by: Sutou Kouhei <[email protected]>
---
 ruby/red-arrow-format/lib/arrow-format.rb          |   1 +
 ruby/red-arrow-format/lib/arrow-format/error.rb    |   3 +
 .../lib/arrow-format/file-reader.rb                | 338 ++------
 .../arrow-format/{file-reader.rb => readable.rb}   | 112 +--
 .../lib/arrow-format/streaming-pull-reader.rb      | 200 +++++
 .../streaming-reader.rb}                           |  35 +-
 ruby/red-arrow-format/test/test-file-reader.rb     | 790 -------------------
 ruby/red-arrow-format/test/test-reader.rb          | 872 +++++++++++++++++++++
 8 files changed, 1202 insertions(+), 1149 deletions(-)

diff --git a/ruby/red-arrow-format/lib/arrow-format.rb 
b/ruby/red-arrow-format/lib/arrow-format.rb
index aea210bfb1..2c8ecbf55c 100644
--- a/ruby/red-arrow-format/lib/arrow-format.rb
+++ b/ruby/red-arrow-format/lib/arrow-format.rb
@@ -16,4 +16,5 @@
 # under the License.
 
 require_relative "arrow-format/file-reader"
+require_relative "arrow-format/streaming-reader"
 require_relative "arrow-format/version"
diff --git a/ruby/red-arrow-format/lib/arrow-format/error.rb 
b/ruby/red-arrow-format/lib/arrow-format/error.rb
index 39b0b8af15..d73c4082be 100644
--- a/ruby/red-arrow-format/lib/arrow-format/error.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/error.rb
@@ -19,6 +19,9 @@ module ArrowFormat
   end
 
   class ReadError < Error
+  end
+
+  class FileReadError < ReadError
     attr_reader :buffer
     def initialize(buffer, message)
       @buffer = buffer
diff --git a/ruby/red-arrow-format/lib/arrow-format/file-reader.rb 
b/ruby/red-arrow-format/lib/arrow-format/file-reader.rb
index 29c7f5edd4..bf50bfd1cd 100644
--- a/ruby/red-arrow-format/lib/arrow-format/file-reader.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/file-reader.rb
@@ -15,57 +15,27 @@
 # specific language governing permissions and limitations
 # under the License.
 
-require_relative "array"
-require_relative "error"
-require_relative "field"
-require_relative "record-batch"
-require_relative "schema"
-require_relative "type"
+require_relative "streaming-reader"
 
-require_relative "org/apache/arrow/flatbuf/binary"
-require_relative "org/apache/arrow/flatbuf/bool"
-require_relative "org/apache/arrow/flatbuf/date"
-require_relative "org/apache/arrow/flatbuf/date_unit"
-require_relative "org/apache/arrow/flatbuf/duration"
-require_relative "org/apache/arrow/flatbuf/fixed_size_binary"
-require_relative "org/apache/arrow/flatbuf/floating_point"
+require_relative "org/apache/arrow/flatbuf/block"
 require_relative "org/apache/arrow/flatbuf/footer"
-require_relative "org/apache/arrow/flatbuf/int"
-require_relative "org/apache/arrow/flatbuf/interval"
-require_relative "org/apache/arrow/flatbuf/interval_unit"
-require_relative "org/apache/arrow/flatbuf/large_binary"
-require_relative "org/apache/arrow/flatbuf/large_list"
-require_relative "org/apache/arrow/flatbuf/large_utf8"
-require_relative "org/apache/arrow/flatbuf/list"
-require_relative "org/apache/arrow/flatbuf/map"
-require_relative "org/apache/arrow/flatbuf/message"
-require_relative "org/apache/arrow/flatbuf/null"
-require_relative "org/apache/arrow/flatbuf/precision"
-require_relative "org/apache/arrow/flatbuf/schema"
-require_relative "org/apache/arrow/flatbuf/struct_"
-require_relative "org/apache/arrow/flatbuf/time"
-require_relative "org/apache/arrow/flatbuf/timestamp"
-require_relative "org/apache/arrow/flatbuf/time_unit"
-require_relative "org/apache/arrow/flatbuf/union"
-require_relative "org/apache/arrow/flatbuf/union_mode"
-require_relative "org/apache/arrow/flatbuf/utf8"
 
 module ArrowFormat
   class FileReader
     include Enumerable
+    include Readable
 
-    MAGIC = "ARROW1".b
+    MAGIC = "ARROW1".b.freeze
     MAGIC_BUFFER = IO::Buffer.for(MAGIC)
     START_MARKER_SIZE = MAGIC_BUFFER.size
     END_MARKER_SIZE = MAGIC_BUFFER.size
-    CONTINUATION = "\xFF\xFF\xFF\xFF".b
-    CONTINUATION_BUFFER = IO::Buffer.for(CONTINUATION)
     # <magic number "ARROW1">
     # <empty padding bytes [to 8 byte boundary]>
     STREAMING_FORMAT_START_OFFSET = 8
-    INT32_SIZE = 4
-    FOOTER_SIZE_SIZE = INT32_SIZE
-    METADATA_SIZE_SIZE = INT32_SIZE
+    CONTINUATION_BUFFER =
+      IO::Buffer.for(MessagePullReader::CONTINUATION_STRING)
+    FOOTER_SIZE_FORMAT = :s32
+    FOOTER_SIZE_SIZE = IO::Buffer.size_of(FOOTER_SIZE_FORMAT)
 
     def initialize(input)
       case input
@@ -79,45 +49,75 @@ module ArrowFormat
 
       validate
       @footer = read_footer
+      @record_batches = @footer.record_batches
+      @schema = read_schema(@footer.schema)
     end
 
-    def each
-      offset = STREAMING_FORMAT_START_OFFSET
-      schema = nil
-      continuation_size = CONTINUATION_BUFFER.size
-      # streaming format
-      loop do
-        continuation = @buffer.slice(offset, continuation_size)
-        unless continuation == CONTINUATION_BUFFER
-          raise ReadError.new(@buffer, "No valid continuation")
-        end
-        offset += continuation_size
+    def n_record_batches
+      @record_batches.size
+    end
 
-        metadata_size = @buffer.get_value(:u32, offset)
-        offset += METADATA_SIZE_SIZE
-        break if metadata_size.zero?
+    def read(i)
+      block = @record_batches[i]
 
-        metadata_data = @buffer.slice(offset, metadata_size)
-        offset += metadata_size
-        metadata = Org::Apache::Arrow::Flatbuf::Message.new(metadata_data)
+      offset = block.offset
 
-        body = @buffer.slice(offset, metadata.body_length)
-        header = metadata.header
-        case header
-        when Org::Apache::Arrow::Flatbuf::Schema
-          schema = read_schema(header)
-        when Org::Apache::Arrow::Flatbuf::RecordBatch
-          n_rows = header.length
-          columns = []
-          nodes = header.nodes
-          buffers = header.buffers
-          schema.fields.each do |field|
-            columns << read_column(field, nodes, buffers, body)
-          end
-          yield(RecordBatch.new(schema, n_rows, columns))
-        end
+      # If we can report property error information, we can use
+      # MessagePullReader here.
+      #
+      # message_pull_reader = MessagePullReader.new do |message, body|
+      #   return read_record_batch(message.header, @schema, body)
+      # end
+      # chunk = @buffer.slice(offset,
+      #                       MessagePullReader::CONTINUATION_SIZE +
+      #                       MessagePullReader::METADATA_LENGTH_SIZE +
+      #                       block.meta_data_length +
+      #                       block.body_length)
+      # message_pull_reader.consume(chunk)
 
-        offset += metadata.body_length
+      continuation_size = CONTINUATION_BUFFER.size
+      continuation = @buffer.slice(offset, continuation_size)
+      unless continuation == CONTINUATION_BUFFER
+        raise FileReadError.new(@buffer,
+                                "Invalid continuation: #{i}: " +
+                                continuation.inspect)
+      end
+      offset += continuation_size
+
+      metadata_length_type = MessagePullReader::METADATA_LENGTH_TYPE
+      metadata_length_size = MessagePullReader::METADATA_LENGTH_SIZE
+      metadata_length = @buffer.get_value(metadata_length_type, offset)
+      expected_metadata_length =
+        block.meta_data_length -
+        continuation_size -
+        metadata_length_size
+      unless metadata_length == expected_metadata_length
+        raise FileReadError.new(@buffer,
+                                "Invalid metadata length #{i}: " +
+                                "expected:#{expected_metadata_length} " +
+                                "actual:#{metadata_length}")
+      end
+      offset += metadata_length_size
+
+      metadata = @buffer.slice(offset, metadata_length)
+      fb_message = Org::Apache::Arrow::Flatbuf::Message.new(metadata)
+      fb_header = fb_message.header
+      unless fb_header.is_a?(Org::Apache::Arrow::Flatbuf::RecordBatch)
+        raise FileReadError.new(@buffer,
+                                "Not a record batch message: #{i}: " +
+                                fb_header.class.name)
+      end
+      offset += metadata_length
+
+      body = @buffer.slice(offset, block.body_length)
+      read_record_batch(fb_header, @schema, body)
+    end
+
+    def each
+      return to_enum(__method__) {n_record_batches} unless block_given?
+
+      @record_batches.size.times do |i|
+        yield(read(i))
       end
     end
 
@@ -127,204 +127,28 @@ module ArrowFormat
                      FOOTER_SIZE_SIZE +
                      END_MARKER_SIZE
       if @buffer.size < minimum_size
-        raise ReadError.new(@buffer,
-                            "Input must be larger than or equal to " +
-                            "#{minimum_size}: #{@buffer.size}")
+        raise FileReadError.new(@buffer,
+                                "Input must be larger than or equal to " +
+                                "#{minimum_size}: #{@buffer.size}")
       end
 
       start_marker = @buffer.slice(0, START_MARKER_SIZE)
       if start_marker != MAGIC_BUFFER
-        raise ReadError.new(@buffer, "No start marker")
+        raise FileReadError.new(@buffer, "No start marker")
       end
-      end_marker = @buffer.slice(@buffer.size - END_MARKER_SIZE, 
END_MARKER_SIZE)
+      end_marker = @buffer.slice(@buffer.size - END_MARKER_SIZE,
+                                 END_MARKER_SIZE)
       if end_marker != MAGIC_BUFFER
-        raise ReadError.new(@buffer, "No end marker")
+        raise FileReadError.new(@buffer, "No end marker")
       end
     end
 
     def read_footer
       footer_size_offset = @buffer.size - END_MARKER_SIZE - FOOTER_SIZE_SIZE
-      footer_size = @buffer.get_value(:u32, footer_size_offset)
-      footer_data = @buffer.slice(footer_size_offset - footer_size, 
footer_size)
+      footer_size = @buffer.get_value(FOOTER_SIZE_FORMAT, footer_size_offset)
+      footer_data = @buffer.slice(footer_size_offset - footer_size,
+                                  footer_size)
       Org::Apache::Arrow::Flatbuf::Footer.new(footer_data)
     end
-
-    def read_field(fb_field)
-      fb_type = fb_field.type
-      case fb_type
-      when Org::Apache::Arrow::Flatbuf::Null
-        type = NullType.singleton
-      when Org::Apache::Arrow::Flatbuf::Bool
-        type = BooleanType.singleton
-      when Org::Apache::Arrow::Flatbuf::Int
-        case fb_type.bit_width
-        when 8
-          if fb_type.signed?
-            type = Int8Type.singleton
-          else
-            type = UInt8Type.singleton
-          end
-        when 16
-          if fb_type.signed?
-            type = Int16Type.singleton
-          else
-            type = UInt16Type.singleton
-          end
-        when 32
-          if fb_type.signed?
-            type = Int32Type.singleton
-          else
-            type = UInt32Type.singleton
-          end
-        when 64
-          if fb_type.signed?
-            type = Int64Type.singleton
-          else
-            type = UInt64Type.singleton
-          end
-        end
-      when Org::Apache::Arrow::Flatbuf::FloatingPoint
-        case fb_type.precision
-        when Org::Apache::Arrow::Flatbuf::Precision::SINGLE
-          type = Float32Type.singleton
-        when Org::Apache::Arrow::Flatbuf::Precision::DOUBLE
-          type = Float64Type.singleton
-        end
-      when Org::Apache::Arrow::Flatbuf::Date
-        case fb_type.unit
-        when Org::Apache::Arrow::Flatbuf::DateUnit::DAY
-          type = Date32Type.singleton
-        when Org::Apache::Arrow::Flatbuf::DateUnit::MILLISECOND
-          type = Date64Type.singleton
-        end
-      when Org::Apache::Arrow::Flatbuf::Time
-        case fb_type.bit_width
-        when 32
-          case fb_type.unit
-          when Org::Apache::Arrow::Flatbuf::TimeUnit::SECOND
-            type = Time32Type.new(:second)
-          when Org::Apache::Arrow::Flatbuf::TimeUnit::MILLISECOND
-            type = Time32Type.new(:millisecond)
-          end
-        when 64
-          case fb_type.unit
-          when Org::Apache::Arrow::Flatbuf::TimeUnit::MICROSECOND
-            type = Time64Type.new(:microsecond)
-          when Org::Apache::Arrow::Flatbuf::TimeUnit::NANOSECOND
-            type = Time64Type.new(:nanosecond)
-          end
-        end
-      when Org::Apache::Arrow::Flatbuf::Timestamp
-        unit = fb_type.unit.name.downcase.to_sym
-        type = TimestampType.new(unit, fb_type.timezone)
-      when Org::Apache::Arrow::Flatbuf::Interval
-        case fb_type.unit
-        when Org::Apache::Arrow::Flatbuf::IntervalUnit::YEAR_MONTH
-          type = YearMonthIntervalType.new
-        when Org::Apache::Arrow::Flatbuf::IntervalUnit::DAY_TIME
-          type = DayTimeIntervalType.new
-        when Org::Apache::Arrow::Flatbuf::IntervalUnit::MONTH_DAY_NANO
-          type = MonthDayNanoIntervalType.new
-        end
-      when Org::Apache::Arrow::Flatbuf::Duration
-        unit = fb_type.unit.name.downcase.to_sym
-        type = DurationType.new(unit)
-      when Org::Apache::Arrow::Flatbuf::List
-        type = ListType.new(read_field(fb_field.children[0]))
-      when Org::Apache::Arrow::Flatbuf::LargeList
-        type = LargeListType.new(read_field(fb_field.children[0]))
-      when Org::Apache::Arrow::Flatbuf::Struct
-        children = fb_field.children.collect {|child| read_field(child)}
-        type = StructType.new(children)
-      when Org::Apache::Arrow::Flatbuf::Union
-        children = fb_field.children.collect {|child| read_field(child)}
-        type_ids = fb_type.type_ids
-        case fb_type.mode
-        when Org::Apache::Arrow::Flatbuf::UnionMode::DENSE
-          type = DenseUnionType.new(children, type_ids)
-        when Org::Apache::Arrow::Flatbuf::UnionMode::SPARSE
-          type = SparseUnionType.new(children, type_ids)
-        end
-      when Org::Apache::Arrow::Flatbuf::Map
-        type = MapType.new(read_field(fb_field.children[0]))
-      when Org::Apache::Arrow::Flatbuf::Binary
-        type = BinaryType.singleton
-      when Org::Apache::Arrow::Flatbuf::LargeBinary
-        type = LargeBinaryType.singleton
-      when Org::Apache::Arrow::Flatbuf::Utf8
-        type = UTF8Type.singleton
-      when Org::Apache::Arrow::Flatbuf::LargeUtf8
-        type = LargeUTF8Type.singleton
-      when Org::Apache::Arrow::Flatbuf::FixedSizeBinary
-        type = FixedSizeBinaryType.new(fb_type.byte_width)
-      end
-      Field.new(fb_field.name, type, fb_field.nullable?)
-    end
-
-    def read_schema(fb_schema)
-      fields = fb_schema.fields.collect do |fb_field|
-        read_field(fb_field)
-      end
-      Schema.new(fields)
-    end
-
-    def read_column(field, nodes, buffers, body)
-      node = nodes.shift
-      length = node.length
-
-      return field.type.build_array(length) if field.type.is_a?(NullType)
-
-      validity_buffer = buffers.shift
-      if validity_buffer.length.zero?
-        validity = nil
-      else
-        validity = body.slice(validity_buffer.offset, validity_buffer.length)
-      end
-
-      case field.type
-      when BooleanType,
-           NumberType,
-           TemporalType
-        values_buffer = buffers.shift
-        values = body.slice(values_buffer.offset, values_buffer.length)
-        field.type.build_array(length, validity, values)
-      when VariableSizeBinaryType
-        offsets_buffer = buffers.shift
-        values_buffer = buffers.shift
-        offsets = body.slice(offsets_buffer.offset, offsets_buffer.length)
-        values = body.slice(values_buffer.offset, values_buffer.length)
-        field.type.build_array(length, validity, offsets, values)
-      when FixedSizeBinaryType
-        values_buffer = buffers.shift
-        values = body.slice(values_buffer.offset, values_buffer.length)
-        field.type.build_array(length, validity, values)
-      when VariableSizeListType
-        offsets_buffer = buffers.shift
-        offsets = body.slice(offsets_buffer.offset, offsets_buffer.length)
-        child = read_column(field.type.child, nodes, buffers, body)
-        field.type.build_array(length, validity, offsets, child)
-      when StructType
-        children = field.type.children.collect do |child|
-          read_column(child, nodes, buffers, body)
-        end
-        field.type.build_array(length, validity, children)
-      when DenseUnionType
-        # dense union type doesn't have validity.
-        types = validity
-        offsets_buffer = buffers.shift
-        offsets = body.slice(offsets_buffer.offset, offsets_buffer.length)
-        children = field.type.children.collect do |child|
-          read_column(child, nodes, buffers, body)
-        end
-        field.type.build_array(length, types, offsets, children)
-      when SparseUnionType
-        # sparse union type doesn't have validity.
-        types = validity
-        children = field.type.children.collect do |child|
-          read_column(child, nodes, buffers, body)
-        end
-        field.type.build_array(length, types, children)
-      end
-    end
   end
 end
diff --git a/ruby/red-arrow-format/lib/arrow-format/file-reader.rb 
b/ruby/red-arrow-format/lib/arrow-format/readable.rb
similarity index 74%
copy from ruby/red-arrow-format/lib/arrow-format/file-reader.rb
copy to ruby/red-arrow-format/lib/arrow-format/readable.rb
index 29c7f5edd4..2d64d5387f 100644
--- a/ruby/red-arrow-format/lib/arrow-format/file-reader.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/readable.rb
@@ -16,7 +16,6 @@
 # under the License.
 
 require_relative "array"
-require_relative "error"
 require_relative "field"
 require_relative "record-batch"
 require_relative "schema"
@@ -29,7 +28,6 @@ require_relative "org/apache/arrow/flatbuf/date_unit"
 require_relative "org/apache/arrow/flatbuf/duration"
 require_relative "org/apache/arrow/flatbuf/fixed_size_binary"
 require_relative "org/apache/arrow/flatbuf/floating_point"
-require_relative "org/apache/arrow/flatbuf/footer"
 require_relative "org/apache/arrow/flatbuf/int"
 require_relative "org/apache/arrow/flatbuf/interval"
 require_relative "org/apache/arrow/flatbuf/interval_unit"
@@ -51,102 +49,13 @@ require_relative "org/apache/arrow/flatbuf/union_mode"
 require_relative "org/apache/arrow/flatbuf/utf8"
 
 module ArrowFormat
-  class FileReader
-    include Enumerable
-
-    MAGIC = "ARROW1".b
-    MAGIC_BUFFER = IO::Buffer.for(MAGIC)
-    START_MARKER_SIZE = MAGIC_BUFFER.size
-    END_MARKER_SIZE = MAGIC_BUFFER.size
-    CONTINUATION = "\xFF\xFF\xFF\xFF".b
-    CONTINUATION_BUFFER = IO::Buffer.for(CONTINUATION)
-    # <magic number "ARROW1">
-    # <empty padding bytes [to 8 byte boundary]>
-    STREAMING_FORMAT_START_OFFSET = 8
-    INT32_SIZE = 4
-    FOOTER_SIZE_SIZE = INT32_SIZE
-    METADATA_SIZE_SIZE = INT32_SIZE
-
-    def initialize(input)
-      case input
-      when IO
-        @buffer = IO::Buffer.map(input, nil, 0, IO::Buffer::READONLY)
-      when String
-        @buffer = IO::Buffer.for(input)
-      else
-        @buffer = input
-      end
-
-      validate
-      @footer = read_footer
-    end
-
-    def each
-      offset = STREAMING_FORMAT_START_OFFSET
-      schema = nil
-      continuation_size = CONTINUATION_BUFFER.size
-      # streaming format
-      loop do
-        continuation = @buffer.slice(offset, continuation_size)
-        unless continuation == CONTINUATION_BUFFER
-          raise ReadError.new(@buffer, "No valid continuation")
-        end
-        offset += continuation_size
-
-        metadata_size = @buffer.get_value(:u32, offset)
-        offset += METADATA_SIZE_SIZE
-        break if metadata_size.zero?
-
-        metadata_data = @buffer.slice(offset, metadata_size)
-        offset += metadata_size
-        metadata = Org::Apache::Arrow::Flatbuf::Message.new(metadata_data)
-
-        body = @buffer.slice(offset, metadata.body_length)
-        header = metadata.header
-        case header
-        when Org::Apache::Arrow::Flatbuf::Schema
-          schema = read_schema(header)
-        when Org::Apache::Arrow::Flatbuf::RecordBatch
-          n_rows = header.length
-          columns = []
-          nodes = header.nodes
-          buffers = header.buffers
-          schema.fields.each do |field|
-            columns << read_column(field, nodes, buffers, body)
-          end
-          yield(RecordBatch.new(schema, n_rows, columns))
-        end
-
-        offset += metadata.body_length
-      end
-    end
-
+  module Readable
     private
-    def validate
-      minimum_size = STREAMING_FORMAT_START_OFFSET +
-                     FOOTER_SIZE_SIZE +
-                     END_MARKER_SIZE
-      if @buffer.size < minimum_size
-        raise ReadError.new(@buffer,
-                            "Input must be larger than or equal to " +
-                            "#{minimum_size}: #{@buffer.size}")
-      end
-
-      start_marker = @buffer.slice(0, START_MARKER_SIZE)
-      if start_marker != MAGIC_BUFFER
-        raise ReadError.new(@buffer, "No start marker")
-      end
-      end_marker = @buffer.slice(@buffer.size - END_MARKER_SIZE, 
END_MARKER_SIZE)
-      if end_marker != MAGIC_BUFFER
-        raise ReadError.new(@buffer, "No end marker")
+    def read_schema(fb_schema)
+      fields = fb_schema.fields.collect do |fb_field|
+        read_field(fb_field)
       end
-    end
-
-    def read_footer
-      footer_size_offset = @buffer.size - END_MARKER_SIZE - FOOTER_SIZE_SIZE
-      footer_size = @buffer.get_value(:u32, footer_size_offset)
-      footer_data = @buffer.slice(footer_size_offset - footer_size, 
footer_size)
-      Org::Apache::Arrow::Flatbuf::Footer.new(footer_data)
+      Schema.new(fields)
     end
 
     def read_field(fb_field)
@@ -261,11 +170,14 @@ module ArrowFormat
       Field.new(fb_field.name, type, fb_field.nullable?)
     end
 
-    def read_schema(fb_schema)
-      fields = fb_schema.fields.collect do |fb_field|
-        read_field(fb_field)
+    def read_record_batch(fb_record_batch, schema, body)
+      n_rows = fb_record_batch.length
+      nodes = fb_record_batch.nodes
+      buffers = fb_record_batch.buffers
+      columns = @schema.fields.collect do |field|
+        read_column(field, nodes, buffers, body)
       end
-      Schema.new(fields)
+      RecordBatch.new(schema, n_rows, columns)
     end
 
     def read_column(field, nodes, buffers, body)
diff --git a/ruby/red-arrow-format/lib/arrow-format/streaming-pull-reader.rb 
b/ruby/red-arrow-format/lib/arrow-format/streaming-pull-reader.rb
new file mode 100644
index 0000000000..ae231fccbc
--- /dev/null
+++ b/ruby/red-arrow-format/lib/arrow-format/streaming-pull-reader.rb
@@ -0,0 +1,200 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+require_relative "array"
+require_relative "error"
+require_relative "field"
+require_relative "readable"
+require_relative "record-batch"
+require_relative "schema"
+require_relative "type"
+
+module ArrowFormat
+  class MessagePullReader
+    CONTINUATION_TYPE = :s32
+    CONTINUATION_SIZE = IO::Buffer.size_of(CONTINUATION_TYPE)
+    CONTINUATION_STRING = "\xFF\xFF\xFF\xFF".b.freeze
+    CONTINUATION_INT32 = -1
+    METADATA_LENGTH_TYPE = :s32
+    METADATA_LENGTH_SIZE = IO::Buffer.size_of(METADATA_LENGTH_TYPE)
+
+    def initialize(&on_read)
+      @on_read = on_read
+      @buffer = IO::Buffer.new(0)
+      @metadata_length = nil
+      @body_length = nil
+      @state = :initial
+    end
+
+    def next_required_size
+      case @state
+      when :initial
+        CONTINUATION_SIZE
+      when :metadata_length
+        METADATA_LENGTH_SIZE
+      when :metadata
+        @metadata_length
+      when :body
+        @body_length
+      when :eos
+        0
+      end
+    end
+
+    def eos?
+      @state == :eos
+    end
+
+    def consume(chunk)
+      return if eos?
+
+      if @buffer.size.zero?
+        target = chunk
+      else
+        @buffer.resize(@buffer.size + chunk.size)
+        @buffer.copy(chunk)
+        target = @buffer
+      end
+
+      loop do
+        next_size = next_required_size
+        break if next_size.zero?
+
+        if target.size < next_size
+          @buffer.resize(target.size) if @buffer.size < target.size
+          @buffer.copy(target)
+          @buffer.resize(target.size)
+          return
+        end
+
+        case @state
+        when :initial
+          consume_initial(target)
+        when :metadata_length
+          consume_metadata_length(target)
+        when :metadata
+          consume_metadata(target)
+        when :body
+          consume_body(target)
+        end
+        break if target.size == next_size
+
+        target = target.slice(next_size)
+      end
+    end
+
+    private
+    def consume_initial(target)
+      continuation = target.get_value(CONTINUATION_TYPE, 0)
+      unless continuation == CONTINUATION_INT32
+        raise ReadError.new("Invalid continuation token: " +
+                            continuation.inspect)
+      end
+      @state = :metadata_length
+    end
+
+    def consume_metadata_length(target)
+      length = target.get_value(METADATA_LENGTH_TYPE, 0)
+      if length < 0
+        raise ReadError.new("Negative metadata length: " +
+                            length.inspect)
+      end
+      if length == 0
+        @state = :eos
+      else
+        @metadata_length = length
+        @state = :metadata
+      end
+    end
+
+    def consume_metadata(target)
+      metadata_buffer = target.slice(0, @metadata_length)
+      @message = Org::Apache::Arrow::Flatbuf::Message.new(metadata_buffer)
+      @body_length = @message.body_length
+      if @body_length < 0
+        raise ReadError.new("Negative body length: " +
+                            @body_length.inspect)
+      end
+      @state = :body
+      consume_body if @body_length.zero?
+    end
+
+    def consume_body(target=nil)
+      body = target&.slice(0, @body_length)
+      @on_read.call(@message, body)
+      @state = :initial
+    end
+  end
+
+  class StreamingPullReader
+    include Readable
+
+    attr_reader :schema
+    def initialize(&on_read)
+      @on_read = on_read
+      @message_pull_reader = MessagePullReader.new do |message, body|
+        process_message(message, body)
+      end
+      @state = :schema
+      @schema = nil
+    end
+
+    def next_required_size
+      @message_pull_reader.next_required_size
+    end
+
+    def eos?
+      @message_pull_reader.eos?
+    end
+
+    def consume(chunk)
+      @message_pull_reader.consume(chunk)
+    end
+
+    private
+    def process_message(message, body)
+      case @state
+      when :schema
+        process_schema_message(message, body)
+      when :record_batch
+        process_record_batch_message(message, body)
+      end
+    end
+
+    def process_schema_message(message, body)
+      header = message.header
+      unless header.is_a?(Org::Apache::Arrow::Flatbuf::Schema)
+        raise ReadError.new("Not a schema message: " +
+                            header.inspect)
+      end
+
+      @schema = read_schema(header)
+      # TODO: initial dictionaries support
+      @state = :record_batch
+    end
+
+    def process_record_batch_message(message, body)
+      header = message.header
+      unless header.is_a?(Org::Apache::Arrow::Flatbuf::RecordBatch)
+        raise ReadError.new("Not a record batch message: " +
+                            header.inspect)
+      end
+
+      @on_read.call(read_record_batch(header, @schema, body))
+    end
+  end
+end
diff --git a/ruby/red-arrow-format/lib/arrow-format.rb 
b/ruby/red-arrow-format/lib/arrow-format/streaming-reader.rb
similarity index 53%
copy from ruby/red-arrow-format/lib/arrow-format.rb
copy to ruby/red-arrow-format/lib/arrow-format/streaming-reader.rb
index aea210bfb1..f11972c67a 100644
--- a/ruby/red-arrow-format/lib/arrow-format.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/streaming-reader.rb
@@ -15,5 +15,36 @@
 # specific language governing permissions and limitations
 # under the License.
 
-require_relative "arrow-format/file-reader"
-require_relative "arrow-format/version"
+require_relative "streaming-pull-reader"
+
+module ArrowFormat
+  class StreamingReader
+    include Enumerable
+
+    attr_reader :schema
+    def initialize(input)
+      @input = input
+      @schema = nil
+    end
+
+    def each
+      return to_enum(__method__) unless block_given?
+
+      reader = StreamingPullReader.new do |record_batch|
+        @schema ||= reader.schema
+        yield(record_batch)
+      end
+
+      buffer = "".b
+      loop do
+        next_size = reader.next_required_size
+        break if next_size.zero?
+
+        next_chunk = @input.read(next_size, buffer)
+        break if next_chunk.nil?
+
+        reader.consume(IO::Buffer.for(next_chunk))
+      end
+    end
+  end
+end
diff --git a/ruby/red-arrow-format/test/test-file-reader.rb 
b/ruby/red-arrow-format/test/test-file-reader.rb
deleted file mode 100644
index 6198f0cb96..0000000000
--- a/ruby/red-arrow-format/test/test-file-reader.rb
+++ /dev/null
@@ -1,790 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-class TestFileReader < Test::Unit::TestCase
-  def setup
-    Dir.mktmpdir do |tmp_dir|
-      table = Arrow::Table.new(value: build_array)
-      @path = File.join(tmp_dir, "data.arrow")
-      table.save(@path)
-      File.open(@path, "rb") do |input|
-        @reader = ArrowFormat::FileReader.new(input)
-        yield
-        @reader = nil
-      end
-      GC.start
-    end
-  end
-
-  def read
-    @reader.to_a.collect do |record_batch|
-      record_batch.to_h.tap do |hash|
-        hash.each do |key, value|
-          hash[key] = value.to_a
-        end
-      end
-    end
-  end
-
-  def type
-    @type ||= @reader.first.schema.fields[0].type
-  end
-
-  sub_test_case("Null") do
-    def build_array
-      Arrow::NullArray.new(3)
-    end
-
-    def test_read
-      assert_equal([{"value" => [nil, nil, nil]}],
-                   read)
-    end
-  end
-
-  sub_test_case("Boolean") do
-    def build_array
-      Arrow::BooleanArray.new([true, nil, false])
-    end
-
-    def test_read
-      assert_equal([{"value" => [true, nil, false]}],
-                   read)
-    end
-  end
-
-  sub_test_case("Int8") do
-    def build_array
-      Arrow::Int8Array.new([-128, nil, 127])
-    end
-
-    def test_read
-      assert_equal([{"value" => [-128, nil, 127]}],
-                   read)
-    end
-  end
-
-  sub_test_case("UInt8") do
-    def build_array
-      Arrow::UInt8Array.new([0, nil, 255])
-    end
-
-    def test_read
-      assert_equal([{"value" => [0, nil, 255]}],
-                   read)
-    end
-  end
-
-  sub_test_case("Int16") do
-    def build_array
-      Arrow::Int16Array.new([-32768, nil, 32767])
-    end
-
-    def test_read
-      assert_equal([{"value" => [-32768, nil, 32767]}],
-                   read)
-    end
-  end
-
-  sub_test_case("UInt16") do
-    def build_array
-      Arrow::UInt16Array.new([0, nil, 65535])
-    end
-
-    def test_read
-      assert_equal([{"value" => [0, nil, 65535]}],
-                   read)
-    end
-  end
-
-  sub_test_case("Int32") do
-    def build_array
-      Arrow::Int32Array.new([-2147483648, nil, 2147483647])
-    end
-
-    def test_read
-      assert_equal([{"value" => [-2147483648, nil, 2147483647]}],
-                   read)
-    end
-  end
-
-  sub_test_case("UInt32") do
-    def build_array
-      Arrow::UInt32Array.new([0, nil, 4294967295])
-    end
-
-    def test_read
-      assert_equal([{"value" => [0, nil, 4294967295]}],
-                   read)
-    end
-  end
-
-  sub_test_case("Int64") do
-    def build_array
-      Arrow::Int64Array.new([
-                              -9223372036854775808,
-                              nil,
-                              9223372036854775807
-                            ])
-    end
-
-    def test_read
-      assert_equal([
-                     {
-                       "value" => [
-                         -9223372036854775808,
-                         nil,
-                         9223372036854775807
-                       ]
-                     }
-                   ],
-                   read)
-    end
-  end
-
-  sub_test_case("UInt64") do
-    def build_array
-      Arrow::UInt64Array.new([0, nil, 18446744073709551615])
-    end
-
-    def test_read
-      assert_equal([{"value" => [0, nil, 18446744073709551615]}],
-                   read)
-    end
-  end
-
-  sub_test_case("Float32") do
-    def build_array
-      Arrow::FloatArray.new([-0.5, nil, 0.5])
-    end
-
-    def test_read
-      assert_equal([{"value" => [-0.5, nil, 0.5]}],
-                   read)
-    end
-  end
-
-  sub_test_case("Float64") do
-    def build_array
-      Arrow::DoubleArray.new([-0.5, nil, 0.5])
-    end
-
-    def test_read
-      assert_equal([{"value" => [-0.5, nil, 0.5]}],
-                   read)
-    end
-  end
-
-  sub_test_case("Date32") do
-    def setup(&block)
-      @date_2017_08_28 = 17406
-      @date_2025_12_09 = 20431
-      super(&block)
-    end
-
-    def build_array
-      Arrow::Date32Array.new([@date_2017_08_28, nil, @date_2025_12_09])
-    end
-
-    def test_read
-      assert_equal([{"value" => [@date_2017_08_28, nil, @date_2025_12_09]}],
-                   read)
-    end
-  end
-
-  sub_test_case("Date64") do
-    def setup(&block)
-      @date_2017_08_28_00_00_00 = 1503878400000
-      @date_2025_12_09_00_00_00 = 1765324800000
-      super(&block)
-    end
-
-    def build_array
-      Arrow::Date64Array.new([
-                               @date_2017_08_28_00_00_00,
-                               nil,
-                               @date_2025_12_09_00_00_00,
-                             ])
-    end
-
-    def test_read
-      assert_equal([
-                     {
-                       "value" => [
-                         @date_2017_08_28_00_00_00,
-                         nil,
-                         @date_2025_12_09_00_00_00,
-                       ],
-                     },
-                   ],
-                   read)
-    end
-  end
-
-  sub_test_case("Time32(:second)") do
-    def setup(&block)
-      @time_00_00_10 = 10
-      @time_00_01_10 = 60 + 10
-      super(&block)
-    end
-
-    def build_array
-      Arrow::Time32Array.new(:second, [@time_00_00_10, nil, @time_00_01_10])
-    end
-
-    def test_read
-      assert_equal([{"value" => [@time_00_00_10, nil, @time_00_01_10]}],
-                   read)
-    end
-
-    def test_type
-      assert_equal(:second, type.unit)
-    end
-  end
-
-  sub_test_case("Time32(:millisecond)") do
-    def setup(&block)
-      @time_00_00_10_000 = 10 * 1000
-      @time_00_01_10_000 = (60 + 10) * 1000
-      super(&block)
-    end
-
-    def build_array
-      Arrow::Time32Array.new(:milli,
-                             [@time_00_00_10_000, nil, @time_00_01_10_000])
-    end
-
-    def test_read
-      assert_equal([{"value" => [@time_00_00_10_000, nil, 
@time_00_01_10_000]}],
-                   read)
-    end
-
-    def test_type
-      assert_equal(:millisecond, type.unit)
-    end
-  end
-
-  sub_test_case("Time64(:microsecond)") do
-    def setup(&block)
-      @time_00_00_10_000_000 = 10 * 1_000_000
-      @time_00_01_10_000_000 = (60 + 10) * 1_000_000
-      super(&block)
-    end
-
-    def build_array
-      Arrow::Time64Array.new(:micro,
-                             [
-                               @time_00_00_10_000_000,
-                               nil,
-                               @time_00_01_10_000_000,
-                             ])
-    end
-
-    def test_read
-      assert_equal([
-                     {
-                       "value" => [
-                         @time_00_00_10_000_000,
-                         nil,
-                         @time_00_01_10_000_000,
-                       ],
-                     },
-                   ],
-                   read)
-    end
-
-    def test_type
-      assert_equal(:microsecond, type.unit)
-    end
-  end
-
-  sub_test_case("Time64(:nanosecond)") do
-    def setup(&block)
-      @time_00_00_10_000_000_000 = 10 * 1_000_000_000
-      @time_00_01_10_000_000_000 = (60 + 10) * 1_000_000_000
-      super(&block)
-    end
-
-    def build_array
-      Arrow::Time64Array.new(:nano,
-                             [
-                               @time_00_00_10_000_000_000,
-                               nil,
-                               @time_00_01_10_000_000_000,
-                             ])
-    end
-
-    def test_read
-      assert_equal([
-                     {
-                       "value" => [
-                         @time_00_00_10_000_000_000,
-                         nil,
-                         @time_00_01_10_000_000_000,
-                       ],
-                     },
-                   ],
-                   read)
-    end
-
-    def test_type
-      assert_equal(:nanosecond, type.unit)
-    end
-  end
-
-  sub_test_case("Timestamp(:second)") do
-    def setup(&block)
-      @timestamp_2019_11_18_00_09_11 = 1574003351
-      @timestamp_2025_12_16_05_33_58 = 1765863238
-      super(&block)
-    end
-
-    def build_array
-      Arrow::TimestampArray.new(:second,
-                                [
-                                  @timestamp_2019_11_18_00_09_11,
-                                  nil,
-                                  @timestamp_2025_12_16_05_33_58,
-                                ])
-    end
-
-    def test_read
-      assert_equal([
-                     {
-                       "value" => [
-                         @timestamp_2019_11_18_00_09_11,
-                         nil,
-                         @timestamp_2025_12_16_05_33_58,
-                       ],
-                     },
-                   ],
-                   read)
-    end
-  end
-
-  sub_test_case("Timestamp(:millisecond)") do
-    def setup(&block)
-      @timestamp_2019_11_18_00_09_11 = 1574003351 * 1_000
-      @timestamp_2025_12_16_05_33_58 = 1765863238 * 1_000
-      super(&block)
-    end
-
-    def build_array
-      Arrow::TimestampArray.new(:milli,
-                                [
-                                  @timestamp_2019_11_18_00_09_11,
-                                  nil,
-                                  @timestamp_2025_12_16_05_33_58,
-                                ])
-    end
-
-    def test_read
-      assert_equal([
-                     {
-                       "value" => [
-                         @timestamp_2019_11_18_00_09_11,
-                         nil,
-                         @timestamp_2025_12_16_05_33_58,
-                       ],
-                     },
-                   ],
-                   read)
-    end
-  end
-
-  sub_test_case("Timestamp(:microsecond)") do
-    def setup(&block)
-      @timestamp_2019_11_18_00_09_11 = 1574003351 * 1_000_000
-      @timestamp_2025_12_16_05_33_58 = 1765863238 * 1_000_000
-      super(&block)
-    end
-
-    def build_array
-      Arrow::TimestampArray.new(:micro,
-                                [
-                                  @timestamp_2019_11_18_00_09_11,
-                                  nil,
-                                  @timestamp_2025_12_16_05_33_58,
-                                ])
-    end
-
-    def test_read
-      assert_equal([
-                     {
-                       "value" => [
-                         @timestamp_2019_11_18_00_09_11,
-                         nil,
-                         @timestamp_2025_12_16_05_33_58,
-                       ],
-                     },
-                   ],
-                   read)
-    end
-  end
-
-  sub_test_case("Timestamp(:nanosecond)") do
-    def setup(&block)
-      @timestamp_2019_11_18_00_09_11 = 1574003351 * 1_000_000_000
-      @timestamp_2025_12_16_05_33_58 = 1765863238 * 1_000_000_000
-      super(&block)
-    end
-
-    def build_array
-      Arrow::TimestampArray.new(:nano,
-                                [
-                                  @timestamp_2019_11_18_00_09_11,
-                                  nil,
-                                  @timestamp_2025_12_16_05_33_58,
-                                ])
-    end
-
-    def test_read
-      assert_equal([
-                     {
-                       "value" => [
-                         @timestamp_2019_11_18_00_09_11,
-                         nil,
-                         @timestamp_2025_12_16_05_33_58,
-                       ],
-                     },
-                   ],
-                   read)
-    end
-  end
-
-  sub_test_case("Timestamp(timezone)") do
-    def setup(&block)
-      @timezone = "UTC"
-      @timestamp_2019_11_18_00_09_11 = 1574003351
-      @timestamp_2025_12_16_05_33_58 = 1765863238
-      super(&block)
-    end
-
-    def build_array
-      data_type = Arrow::TimestampDataType.new(:second, @timezone)
-      Arrow::TimestampArray.new(data_type,
-                                [
-                                  @timestamp_2019_11_18_00_09_11,
-                                  nil,
-                                  @timestamp_2025_12_16_05_33_58,
-                                ])
-    end
-
-    def test_type
-      assert_equal([:second, @timezone],
-                   [type.unit, type.timezone])
-    end
-  end
-
-  sub_test_case("YearMonthInterval") do
-    def build_array
-      Arrow::MonthIntervalArray.new([0, nil, 100])
-    end
-
-    def test_read
-      assert_equal([{"value" => [0, nil, 100]}],
-                   read)
-    end
-  end
-
-  sub_test_case("DayTimeInterval") do
-    def build_array
-      Arrow::DayTimeIntervalArray.new([
-                                        {day: 1, millisecond: 100},
-                                        nil,
-                                        {day: 3, millisecond: 300},
-                                      ])
-    end
-
-    def test_read
-      assert_equal([
-                     {
-                       "value" => [
-                         [1, 100],
-                         nil,
-                         [3, 300],
-                       ],
-                     },
-                   ],
-                   read)
-    end
-  end
-
-  sub_test_case("MonthDayNanoInterval") do
-    def build_array
-      Arrow::MonthDayNanoIntervalArray.new([
-                                             {
-                                               month: 1,
-                                               day: 1,
-                                               nanosecond: 100,
-                                             },
-                                             nil,
-                                             {
-                                               month: 3,
-                                               day: 3,
-                                               nanosecond: 300,
-                                             },
-                                           ])
-    end
-
-    def test_read
-      assert_equal([
-                     {
-                       "value" => [
-                         [1, 1, 100],
-                         nil,
-                         [3, 3, 300],
-                       ],
-                     },
-                   ],
-                   read)
-    end
-  end
-
-  sub_test_case("Duration(:second)") do
-    def build_array
-      Arrow::DurationArray.new(:second, [0, nil, 100])
-    end
-
-    def test_read
-      assert_equal([{"value" => [0, nil, 100]}],
-                   read)
-    end
-
-    def test_type
-      assert_equal(:second, type.unit)
-    end
-  end
-
-  sub_test_case("Duration(:millisecond)") do
-    def build_array
-      Arrow::DurationArray.new(:milli, [0, nil, 100_000])
-    end
-
-    def test_read
-      assert_equal([{"value" => [0, nil, 100_000]}],
-                   read)
-    end
-
-    def test_type
-      assert_equal(:millisecond, type.unit)
-    end
-  end
-
-  sub_test_case("Duration(:microsecond)") do
-    def build_array
-      Arrow::DurationArray.new(:micro, [0, nil, 100_000_000])
-    end
-
-    def test_read
-      assert_equal([{"value" => [0, nil, 100_000_000]}],
-                   read)
-    end
-
-    def test_type
-      assert_equal(:microsecond, type.unit)
-    end
-  end
-
-  sub_test_case("Duration(:nanosecond)") do
-    def build_array
-      Arrow::DurationArray.new(:nano, [0, nil, 100_000_000_000])
-    end
-
-    def test_read
-      assert_equal([{"value" => [0, nil, 100_000_000_000]}],
-                   read)
-    end
-
-    def test_type
-      assert_equal(:nanosecond, type.unit)
-    end
-  end
-
-  sub_test_case("Binary") do
-    def build_array
-      Arrow::BinaryArray.new(["Hello".b, nil, "World".b])
-    end
-
-    def test_read
-      assert_equal([{"value" => ["Hello".b, nil, "World".b]}],
-                   read)
-    end
-  end
-
-  sub_test_case("LargeBinary") do
-    def build_array
-      Arrow::LargeBinaryArray.new(["Hello".b, nil, "World".b])
-    end
-
-    def test_read
-      assert_equal([{"value" => ["Hello".b, nil, "World".b]}],
-                   read)
-    end
-  end
-
-  sub_test_case("UTF8") do
-    def build_array
-      Arrow::StringArray.new(["Hello", nil, "World"])
-    end
-
-    def test_read
-      assert_equal([{"value" => ["Hello", nil, "World"]}],
-                   read)
-    end
-  end
-
-  sub_test_case("LargeUTF8") do
-    def build_array
-      Arrow::LargeStringArray.new(["Hello", nil, "World"])
-    end
-
-    def test_read
-      assert_equal([{"value" => ["Hello", nil, "World"]}],
-                   read)
-    end
-  end
-
-  sub_test_case("FixedSizeBinary") do
-    def build_array
-      data_type = Arrow::FixedSizeBinaryDataType.new(4)
-      Arrow::FixedSizeBinaryArray.new(data_type, ["0124".b, nil, "abcd".b])
-    end
-
-    def test_read
-      assert_equal([{"value" => ["0124".b, nil, "abcd".b]}],
-                   read)
-    end
-  end
-
-  sub_test_case("List") do
-    def build_array
-      data_type = Arrow::ListDataType.new(name: "count", type: :int8)
-      Arrow::ListArray.new(data_type, [[-128, 127], nil, [-1, 0, 1]])
-    end
-
-    def test_read
-      assert_equal([{"value" => [[-128, 127], nil, [-1, 0, 1]]}],
-                   read)
-    end
-  end
-
-  sub_test_case("LargeList") do
-    def build_array
-      data_type = Arrow::LargeListDataType.new(name: "count", type: :int8)
-      Arrow::LargeListArray.new(data_type, [[-128, 127], nil, [-1, 0, 1]])
-    end
-
-    def test_read
-      assert_equal([{"value" => [[-128, 127], nil, [-1, 0, 1]]}],
-                   read)
-    end
-  end
-
-  sub_test_case("Struct") do
-    def build_array
-      data_type = Arrow::StructDataType.new(count: :int8,
-                                            visible: :boolean)
-      Arrow::StructArray.new(data_type, [[-128, nil], nil, [nil, true]])
-    end
-
-    def test_read
-      assert_equal([
-                     {
-                       "value" => [
-                         [-128, nil],
-                         nil,
-                         [nil, true],
-                       ],
-                     },
-                   ],
-                   read)
-    end
-  end
-
-  sub_test_case("DenseUnion") do
-    def build_array
-      fields = [
-        Arrow::Field.new("number", :int8),
-        Arrow::Field.new("text", :string),
-      ]
-      type_ids = [11, 13]
-      data_type = Arrow::DenseUnionDataType.new(fields, type_ids)
-      types = Arrow::Int8Array.new([11, 13, 11, 13, 13])
-      value_offsets = Arrow::Int32Array.new([0, 0, 1, 1, 2])
-      children = [
-        Arrow::Int8Array.new([1, nil]),
-        Arrow::StringArray.new(["a", "b", "c"])
-      ]
-      Arrow::DenseUnionArray.new(data_type,
-                                 types,
-                                 value_offsets,
-                                 children)
-    end
-
-    def test_read
-      assert_equal([{"value" => [1, "a", nil, "b", "c"]}],
-                   read)
-    end
-  end
-
-  sub_test_case("SparseUnion") do
-    def build_array
-      fields = [
-        Arrow::Field.new("number", :int8),
-        Arrow::Field.new("text", :string),
-      ]
-      type_ids = [11, 13]
-      data_type = Arrow::SparseUnionDataType.new(fields, type_ids)
-      types = Arrow::Int8Array.new([11, 13, 11, 13, 11])
-      children = [
-        Arrow::Int8Array.new([1, nil, nil, nil, 5]),
-        Arrow::StringArray.new([nil, "b", nil, "d", nil])
-      ]
-      Arrow::SparseUnionArray.new(data_type, types, children)
-    end
-
-    def test_read
-      assert_equal([{"value" => [1, "b", nil, "d", 5]}],
-                   read)
-    end
-  end
-
-  sub_test_case("Map") do
-    def build_array
-      data_type = Arrow::MapDataType.new(:string, :int8)
-      Arrow::MapArray.new(data_type,
-                          [
-                            {"a" => -128, "b" => 127},
-                            nil,
-                            {"c" => nil},
-                          ])
-    end
-
-    def test_read
-      assert_equal([
-                     {
-                       "value" => [
-                         {"a" => -128, "b" => 127},
-                         nil,
-                         {"c" => nil},
-                       ],
-                     },
-                   ],
-                   read)
-    end
-  end
-end
diff --git a/ruby/red-arrow-format/test/test-reader.rb 
b/ruby/red-arrow-format/test/test-reader.rb
new file mode 100644
index 0000000000..8095adfd50
--- /dev/null
+++ b/ruby/red-arrow-format/test/test-reader.rb
@@ -0,0 +1,872 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+module ReaderTests
+  class << self
+    def included(base)
+      base.class_eval do
+        sub_test_case("Null") do
+          def build_array
+            Arrow::NullArray.new(3)
+          end
+
+          def test_read
+            assert_equal([{"value" => [nil, nil, nil]}],
+                         read)
+          end
+        end
+
+        sub_test_case("Boolean") do
+          def build_array
+            Arrow::BooleanArray.new([true, nil, false])
+          end
+
+          def test_read
+            assert_equal([{"value" => [true, nil, false]}],
+                         read)
+          end
+        end
+
+        sub_test_case("Int8") do
+          def build_array
+            Arrow::Int8Array.new([-128, nil, 127])
+          end
+
+          def test_read
+            assert_equal([{"value" => [-128, nil, 127]}],
+                         read)
+          end
+        end
+
+        sub_test_case("UInt8") do
+          def build_array
+            Arrow::UInt8Array.new([0, nil, 255])
+          end
+
+          def test_read
+            assert_equal([{"value" => [0, nil, 255]}],
+                         read)
+          end
+        end
+
+        sub_test_case("Int16") do
+          def build_array
+            Arrow::Int16Array.new([-32768, nil, 32767])
+          end
+
+          def test_read
+            assert_equal([{"value" => [-32768, nil, 32767]}],
+                         read)
+          end
+        end
+
+        sub_test_case("UInt16") do
+          def build_array
+            Arrow::UInt16Array.new([0, nil, 65535])
+          end
+
+          def test_read
+            assert_equal([{"value" => [0, nil, 65535]}],
+                         read)
+          end
+        end
+
+        sub_test_case("Int32") do
+          def build_array
+            Arrow::Int32Array.new([-2147483648, nil, 2147483647])
+          end
+
+          def test_read
+            assert_equal([{"value" => [-2147483648, nil, 2147483647]}],
+                         read)
+          end
+        end
+
+        sub_test_case("UInt32") do
+          def build_array
+            Arrow::UInt32Array.new([0, nil, 4294967295])
+          end
+
+          def test_read
+            assert_equal([{"value" => [0, nil, 4294967295]}],
+                         read)
+          end
+        end
+
+        sub_test_case("Int64") do
+          def build_array
+            Arrow::Int64Array.new([
+                                    -9223372036854775808,
+                                    nil,
+                                    9223372036854775807
+                                  ])
+          end
+
+          def test_read
+            assert_equal([
+                           {
+                             "value" => [
+                               -9223372036854775808,
+                               nil,
+                               9223372036854775807
+                             ]
+                           }
+                         ],
+                         read)
+          end
+        end
+
+        sub_test_case("UInt64") do
+          def build_array
+            Arrow::UInt64Array.new([0, nil, 18446744073709551615])
+          end
+
+          def test_read
+            assert_equal([{"value" => [0, nil, 18446744073709551615]}],
+                         read)
+          end
+        end
+
+        sub_test_case("Float32") do
+          def build_array
+            Arrow::FloatArray.new([-0.5, nil, 0.5])
+          end
+
+          def test_read
+            assert_equal([{"value" => [-0.5, nil, 0.5]}],
+                         read)
+          end
+        end
+
+        sub_test_case("Float64") do
+          def build_array
+            Arrow::DoubleArray.new([-0.5, nil, 0.5])
+          end
+
+          def test_read
+            assert_equal([{"value" => [-0.5, nil, 0.5]}],
+                         read)
+          end
+        end
+
+        sub_test_case("Date32") do
+          def setup(&block)
+            @date_2017_08_28 = 17406
+            @date_2025_12_09 = 20431
+            super(&block)
+          end
+
+          def build_array
+            Arrow::Date32Array.new([@date_2017_08_28, nil, @date_2025_12_09])
+          end
+
+          def test_read
+            assert_equal([
+                           {
+                             "value" => [
+                               @date_2017_08_28,
+                               nil,
+                               @date_2025_12_09,
+                             ],
+                           },
+                         ],
+                         read)
+          end
+        end
+
+        sub_test_case("Date64") do
+          def setup(&block)
+            @date_2017_08_28_00_00_00 = 1503878400000
+            @date_2025_12_09_00_00_00 = 1765324800000
+            super(&block)
+          end
+
+          def build_array
+            Arrow::Date64Array.new([
+                                     @date_2017_08_28_00_00_00,
+                                     nil,
+                                     @date_2025_12_09_00_00_00,
+                                   ])
+          end
+
+          def test_read
+            assert_equal([
+                           {
+                             "value" => [
+                               @date_2017_08_28_00_00_00,
+                               nil,
+                               @date_2025_12_09_00_00_00,
+                             ],
+                           },
+                         ],
+                         read)
+          end
+        end
+
+        sub_test_case("Time32(:second)") do
+          def setup(&block)
+            @time_00_00_10 = 10
+            @time_00_01_10 = 60 + 10
+            super(&block)
+          end
+
+          def build_array
+            Arrow::Time32Array.new(:second, [@time_00_00_10, nil, 
@time_00_01_10])
+          end
+
+          def test_read
+            assert_equal([
+                           {
+                             "value" => [
+                               @time_00_00_10,
+                               nil,
+                               @time_00_01_10,
+                             ],
+                           },
+                         ],
+                         read)
+          end
+
+          def test_type
+            assert_equal(:second, type.unit)
+          end
+        end
+
+        sub_test_case("Time32(:millisecond)") do
+          def setup(&block)
+            @time_00_00_10_000 = 10 * 1000
+            @time_00_01_10_000 = (60 + 10) * 1000
+            super(&block)
+          end
+
+          def build_array
+            Arrow::Time32Array.new(:milli,
+                                   [
+                                     @time_00_00_10_000,
+                                     nil,
+                                     @time_00_01_10_000,
+                                   ])
+          end
+
+          def test_read
+            assert_equal([
+                           {
+                             "value" => [
+                               @time_00_00_10_000,
+                               nil,
+                               @time_00_01_10_000,
+                             ],
+                           },
+                         ],
+                         read)
+          end
+
+          def test_type
+            assert_equal(:millisecond, type.unit)
+          end
+        end
+
+        sub_test_case("Time64(:microsecond)") do
+          def setup(&block)
+            @time_00_00_10_000_000 = 10 * 1_000_000
+            @time_00_01_10_000_000 = (60 + 10) * 1_000_000
+            super(&block)
+          end
+
+          def build_array
+            Arrow::Time64Array.new(:micro,
+                                   [
+                                     @time_00_00_10_000_000,
+                                     nil,
+                                     @time_00_01_10_000_000,
+                                   ])
+          end
+
+          def test_read
+            assert_equal([
+                           {
+                             "value" => [
+                               @time_00_00_10_000_000,
+                               nil,
+                               @time_00_01_10_000_000,
+                             ],
+                           },
+                         ],
+                         read)
+          end
+
+          def test_type
+            assert_equal(:microsecond, type.unit)
+          end
+        end
+
+        sub_test_case("Time64(:nanosecond)") do
+          def setup(&block)
+            @time_00_00_10_000_000_000 = 10 * 1_000_000_000
+            @time_00_01_10_000_000_000 = (60 + 10) * 1_000_000_000
+            super(&block)
+          end
+
+          def build_array
+            Arrow::Time64Array.new(:nano,
+                                   [
+                                     @time_00_00_10_000_000_000,
+                                     nil,
+                                     @time_00_01_10_000_000_000,
+                                   ])
+          end
+
+          def test_read
+            assert_equal([
+                           {
+                             "value" => [
+                               @time_00_00_10_000_000_000,
+                               nil,
+                               @time_00_01_10_000_000_000,
+                             ],
+                           },
+                         ],
+                         read)
+          end
+
+          def test_type
+            assert_equal(:nanosecond, type.unit)
+          end
+        end
+
+        sub_test_case("Timestamp(:second)") do
+          def setup(&block)
+            @timestamp_2019_11_18_00_09_11 = 1574003351
+            @timestamp_2025_12_16_05_33_58 = 1765863238
+            super(&block)
+          end
+
+          def build_array
+            Arrow::TimestampArray.new(:second,
+                                      [
+                                        @timestamp_2019_11_18_00_09_11,
+                                        nil,
+                                        @timestamp_2025_12_16_05_33_58,
+                                      ])
+          end
+
+          def test_read
+            assert_equal([
+                           {
+                             "value" => [
+                               @timestamp_2019_11_18_00_09_11,
+                               nil,
+                               @timestamp_2025_12_16_05_33_58,
+                             ],
+                           },
+                         ],
+                         read)
+          end
+        end
+
+        sub_test_case("Timestamp(:millisecond)") do
+          def setup(&block)
+            @timestamp_2019_11_18_00_09_11 = 1574003351 * 1_000
+            @timestamp_2025_12_16_05_33_58 = 1765863238 * 1_000
+            super(&block)
+          end
+
+          def build_array
+            Arrow::TimestampArray.new(:milli,
+                                      [
+                                        @timestamp_2019_11_18_00_09_11,
+                                        nil,
+                                        @timestamp_2025_12_16_05_33_58,
+                                      ])
+          end
+
+          def test_read
+            assert_equal([
+                           {
+                             "value" => [
+                               @timestamp_2019_11_18_00_09_11,
+                               nil,
+                               @timestamp_2025_12_16_05_33_58,
+                             ],
+                           },
+                         ],
+                         read)
+          end
+        end
+
+        sub_test_case("Timestamp(:microsecond)") do
+          def setup(&block)
+            @timestamp_2019_11_18_00_09_11 = 1574003351 * 1_000_000
+            @timestamp_2025_12_16_05_33_58 = 1765863238 * 1_000_000
+            super(&block)
+          end
+
+          def build_array
+            Arrow::TimestampArray.new(:micro,
+                                      [
+                                        @timestamp_2019_11_18_00_09_11,
+                                        nil,
+                                        @timestamp_2025_12_16_05_33_58,
+                                      ])
+          end
+
+          def test_read
+            assert_equal([
+                           {
+                             "value" => [
+                               @timestamp_2019_11_18_00_09_11,
+                               nil,
+                               @timestamp_2025_12_16_05_33_58,
+                             ],
+                           },
+                         ],
+                         read)
+          end
+        end
+
+        sub_test_case("Timestamp(:nanosecond)") do
+          def setup(&block)
+            @timestamp_2019_11_18_00_09_11 = 1574003351 * 1_000_000_000
+            @timestamp_2025_12_16_05_33_58 = 1765863238 * 1_000_000_000
+            super(&block)
+          end
+
+          def build_array
+            Arrow::TimestampArray.new(:nano,
+                                      [
+                                        @timestamp_2019_11_18_00_09_11,
+                                        nil,
+                                        @timestamp_2025_12_16_05_33_58,
+                                      ])
+          end
+
+          def test_read
+            assert_equal([
+                           {
+                             "value" => [
+                               @timestamp_2019_11_18_00_09_11,
+                               nil,
+                               @timestamp_2025_12_16_05_33_58,
+                             ],
+                           },
+                         ],
+                         read)
+          end
+        end
+
+        sub_test_case("Timestamp(timezone)") do
+          def setup(&block)
+            @timezone = "UTC"
+            @timestamp_2019_11_18_00_09_11 = 1574003351
+            @timestamp_2025_12_16_05_33_58 = 1765863238
+            super(&block)
+          end
+
+          def build_array
+            data_type = Arrow::TimestampDataType.new(:second, @timezone)
+            Arrow::TimestampArray.new(data_type,
+                                      [
+                                        @timestamp_2019_11_18_00_09_11,
+                                        nil,
+                                        @timestamp_2025_12_16_05_33_58,
+                                      ])
+          end
+
+          def test_type
+            assert_equal([:second, @timezone],
+                         [type.unit, type.timezone])
+          end
+        end
+
+        sub_test_case("YearMonthInterval") do
+          def build_array
+            Arrow::MonthIntervalArray.new([0, nil, 100])
+          end
+
+          def test_read
+            assert_equal([{"value" => [0, nil, 100]}],
+                         read)
+          end
+        end
+
+        sub_test_case("DayTimeInterval") do
+          def build_array
+            Arrow::DayTimeIntervalArray.new([
+                                              {day: 1, millisecond: 100},
+                                              nil,
+                                              {day: 3, millisecond: 300},
+                                            ])
+          end
+
+          def test_read
+            assert_equal([
+                           {
+                             "value" => [
+                               [1, 100],
+                               nil,
+                               [3, 300],
+                             ],
+                           },
+                         ],
+                         read)
+          end
+        end
+
+        sub_test_case("MonthDayNanoInterval") do
+          def build_array
+            Arrow::MonthDayNanoIntervalArray.new([
+                                                   {
+                                                     month: 1,
+                                                     day: 1,
+                                                     nanosecond: 100,
+                                                   },
+                                                   nil,
+                                                   {
+                                                     month: 3,
+                                                     day: 3,
+                                                     nanosecond: 300,
+                                                   },
+                                                 ])
+          end
+
+          def test_read
+            assert_equal([
+                           {
+                             "value" => [
+                               [1, 1, 100],
+                               nil,
+                               [3, 3, 300],
+                             ],
+                           },
+                         ],
+                         read)
+          end
+        end
+
+        sub_test_case("Duration(:second)") do
+          def build_array
+            Arrow::DurationArray.new(:second, [0, nil, 100])
+          end
+
+          def test_read
+            assert_equal([{"value" => [0, nil, 100]}],
+                         read)
+          end
+
+          def test_type
+            assert_equal(:second, type.unit)
+          end
+        end
+
+        sub_test_case("Duration(:millisecond)") do
+          def build_array
+            Arrow::DurationArray.new(:milli, [0, nil, 100_000])
+          end
+
+          def test_read
+            assert_equal([{"value" => [0, nil, 100_000]}],
+                         read)
+          end
+
+          def test_type
+            assert_equal(:millisecond, type.unit)
+          end
+        end
+
+        sub_test_case("Duration(:microsecond)") do
+          def build_array
+            Arrow::DurationArray.new(:micro, [0, nil, 100_000_000])
+          end
+
+          def test_read
+            assert_equal([{"value" => [0, nil, 100_000_000]}],
+                         read)
+          end
+
+          def test_type
+            assert_equal(:microsecond, type.unit)
+          end
+        end
+
+        sub_test_case("Duration(:nanosecond)") do
+          def build_array
+            Arrow::DurationArray.new(:nano, [0, nil, 100_000_000_000])
+          end
+
+          def test_read
+            assert_equal([{"value" => [0, nil, 100_000_000_000]}],
+                         read)
+          end
+
+          def test_type
+            assert_equal(:nanosecond, type.unit)
+          end
+        end
+
+        sub_test_case("Binary") do
+          def build_array
+            Arrow::BinaryArray.new(["Hello".b, nil, "World".b])
+          end
+
+          def test_read
+            assert_equal([{"value" => ["Hello".b, nil, "World".b]}],
+                         read)
+          end
+        end
+
+        sub_test_case("LargeBinary") do
+          def build_array
+            Arrow::LargeBinaryArray.new(["Hello".b, nil, "World".b])
+          end
+
+          def test_read
+            assert_equal([{"value" => ["Hello".b, nil, "World".b]}],
+                         read)
+          end
+        end
+
+        sub_test_case("UTF8") do
+          def build_array
+            Arrow::StringArray.new(["Hello", nil, "World"])
+          end
+
+          def test_read
+            assert_equal([{"value" => ["Hello", nil, "World"]}],
+                         read)
+          end
+        end
+
+        sub_test_case("LargeUTF8") do
+          def build_array
+            Arrow::LargeStringArray.new(["Hello", nil, "World"])
+          end
+
+          def test_read
+            assert_equal([{"value" => ["Hello", nil, "World"]}],
+                         read)
+          end
+        end
+
+        sub_test_case("FixedSizeBinary") do
+          def build_array
+            data_type = Arrow::FixedSizeBinaryDataType.new(4)
+            Arrow::FixedSizeBinaryArray.new(data_type,
+                                            ["0124".b, nil, "abcd".b])
+          end
+
+          def test_read
+            assert_equal([{"value" => ["0124".b, nil, "abcd".b]}],
+                         read)
+          end
+        end
+
+        sub_test_case("List") do
+          def build_array
+            data_type = Arrow::ListDataType.new(name: "count", type: :int8)
+            Arrow::ListArray.new(data_type, [[-128, 127], nil, [-1, 0, 1]])
+          end
+
+          def test_read
+            assert_equal([{"value" => [[-128, 127], nil, [-1, 0, 1]]}],
+                         read)
+          end
+        end
+
+        sub_test_case("LargeList") do
+          def build_array
+            data_type = Arrow::LargeListDataType.new(name: "count",
+                                                     type: :int8)
+            Arrow::LargeListArray.new(data_type,
+                                      [[-128, 127], nil, [-1, 0, 1]])
+          end
+
+          def test_read
+            assert_equal([
+                           {
+                             "value" => [
+                               [-128, 127],
+                               nil,
+                               [-1, 0, 1],
+                             ],
+                           },
+                         ],
+                         read)
+          end
+        end
+
+        sub_test_case("Struct") do
+          def build_array
+            data_type = Arrow::StructDataType.new(count: :int8,
+                                                  visible: :boolean)
+            Arrow::StructArray.new(data_type,
+                                   [[-128, nil], nil, [nil, true]])
+          end
+
+          def test_read
+            assert_equal([
+                           {
+                             "value" => [
+                               [-128, nil],
+                               nil,
+                               [nil, true],
+                             ],
+                           },
+                         ],
+                         read)
+          end
+        end
+
+        sub_test_case("DenseUnion") do
+          def build_array
+            fields = [
+              Arrow::Field.new("number", :int8),
+              Arrow::Field.new("text", :string),
+            ]
+            type_ids = [11, 13]
+            data_type = Arrow::DenseUnionDataType.new(fields, type_ids)
+            types = Arrow::Int8Array.new([11, 13, 11, 13, 13])
+            value_offsets = Arrow::Int32Array.new([0, 0, 1, 1, 2])
+            children = [
+              Arrow::Int8Array.new([1, nil]),
+              Arrow::StringArray.new(["a", "b", "c"])
+            ]
+            Arrow::DenseUnionArray.new(data_type,
+                                       types,
+                                       value_offsets,
+                                       children)
+          end
+
+          def test_read
+            assert_equal([{"value" => [1, "a", nil, "b", "c"]}],
+                         read)
+          end
+        end
+
+        sub_test_case("SparseUnion") do
+          def build_array
+            fields = [
+              Arrow::Field.new("number", :int8),
+              Arrow::Field.new("text", :string),
+            ]
+            type_ids = [11, 13]
+            data_type = Arrow::SparseUnionDataType.new(fields, type_ids)
+            types = Arrow::Int8Array.new([11, 13, 11, 13, 11])
+            children = [
+              Arrow::Int8Array.new([1, nil, nil, nil, 5]),
+              Arrow::StringArray.new([nil, "b", nil, "d", nil])
+            ]
+            Arrow::SparseUnionArray.new(data_type, types, children)
+          end
+
+          def test_read
+            assert_equal([{"value" => [1, "b", nil, "d", 5]}],
+                         read)
+          end
+        end
+
+        sub_test_case("Map") do
+          def build_array
+            data_type = Arrow::MapDataType.new(:string, :int8)
+            Arrow::MapArray.new(data_type,
+                                [
+                                  {"a" => -128, "b" => 127},
+                                  nil,
+                                  {"c" => nil},
+                                ])
+          end
+
+          def test_read
+            assert_equal([
+                           {
+                             "value" => [
+                               {"a" => -128, "b" => 127},
+                               nil,
+                               {"c" => nil},
+                             ],
+                           },
+                         ],
+                         read)
+          end
+        end
+      end
+    end
+  end
+end
+
+class TestFileReader < Test::Unit::TestCase
+  include ReaderTests
+
+  def setup
+    Dir.mktmpdir do |tmp_dir|
+      table = Arrow::Table.new(value: build_array)
+      @path = File.join(tmp_dir, "data.arrow")
+      table.save(@path)
+      File.open(@path, "rb") do |input|
+        @reader = ArrowFormat::FileReader.new(input)
+        yield
+        @reader = nil
+      end
+      GC.start
+    end
+  end
+
+  def read
+    @reader.to_a.collect do |record_batch|
+      record_batch.to_h.tap do |hash|
+        hash.each do |key, value|
+          hash[key] = value.to_a
+        end
+      end
+    end
+  end
+
+  def type
+    @type ||= @reader.first.schema.fields[0].type
+  end
+end
+
+class TestStreamingReader < Test::Unit::TestCase
+  include ReaderTests
+
+  def setup
+    Dir.mktmpdir do |tmp_dir|
+      table = Arrow::Table.new(value: build_array)
+      @path = File.join(tmp_dir, "data.arrows")
+      table.save(@path)
+      File.open(@path, "rb") do |input|
+        @reader = ArrowFormat::StreamingReader.new(input)
+        yield
+        @reader = nil
+      end
+      GC.start
+    end
+  end
+
+  def read
+    @reader.to_a.collect do |record_batch|
+      record_batch.to_h.tap do |hash|
+        hash.each do |key, value|
+          hash[key] = value.to_a
+        end
+      end
+    end
+  end
+
+  def type
+    @type ||= @reader.first.schema.fields[0].type
+  end
+end

Reply via email to