gli-chris-hao commented on code in PR #1388:
URL: https://github.com/apache/iceberg-python/pull/1388#discussion_r1900228680


##########
pyiceberg/table/__init__.py:
##########
@@ -1594,6 +1609,29 @@ def to_ray(self) -> ray.data.dataset.Dataset:
 
         return ray.data.from_arrow(self.to_arrow())
 
+    def count(self) -> int:
+        """
+        Usage: calutates the total number of records in a Scan that haven't 
had positional deletes
+        """
+        res = 0
+        # every task is a FileScanTask
+        tasks = self.plan_files()
+
+        for task in tasks:
+            # task.residual is a Boolean Expression if the fiter condition is 
fully satisfied by the
+            # partition value and task.delete_files represents that positional 
delete haven't been merged yet
+            # hence those files have to read as a pyarrow table applying the 
filter and deletes
+            if task.residual == AlwaysTrue() and not len(task.delete_files):
+                # Every File has a metadata stat that stores the file record 
count
+                res += task.file.record_count
+            else:
+                from pyiceberg.io.pyarrow import ArrowScan
+                tbl = ArrowScan(
+                    self.table_metadata, self.io, self.projection(), 
self.row_filter, self.case_sensitive, self.limit
+                ).to_table([task])
+                res += len(tbl)
+        return res

Review Comment:
   I love this approach! My only concern is about loading too much data into 
memory at once, although this is loading just one file at a time, in the worst 
case some file could potentially be very large? Shall we define a threshold and 
check, for example, if `file size < 512MB`, load entire file, otherwise turn it 
into `pa.RecordBatchReader` and read stream of record batches for counting.
   
   ```
   target_schema = schema_to_pyarrow(self.projection())
   
   batches = ArrowScan(
       self.table_metadata, self.io, self.projection(), self.row_filter, 
self.case_sensitive, self.limit
   ).to_record_batches([task])
   
   reader = pa.RecordBatchReader.from_batches(
       target_schema,
       batches,
   )
   
   count = 0
   for batch in reader:
       count += batch.num_rows
   return count
   ```
   
https://github.com/apache/iceberg-python/blob/e6465001bd8a47718ff79da4def5800962e6b895/pyiceberg/table/__init__.py#L1541-L1564



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to