kevinjqliu opened a new issue, #1506: URL: https://github.com/apache/iceberg-python/issues/1506
### Apache Iceberg version None ### Please describe the bug 🐞 From slack, """ Hi team, There have been occasional reports from internal users that the number of records retrieved when loading data with PyIceberg sometimes differs. Since it is difficult to determine the specific circumstances under which this occurs, reproducing the issue has been challenging. What kind of logging should be implemented to help identify the root cause when the issue arises? """ ``` Git commit: a051584a3684392d2db6556449eb299145d47d15 (pyiceberg-0.8.1 tag) def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table: """Scan the Iceberg table and return a pa.Table. Returns a pa.Table with data from the Iceberg table by resolving the right columns that match the current table schema. Only data that matches the provided row_filter expression is returned. Args: tasks: FileScanTasks representing the data files and delete files to read from. Returns: A PyArrow table. Total number of rows will be capped if specified. Raises: ResolveError: When a required field cannot be found in the file ValueError: When a field type in the file cannot be projected to the schema type """ deletes_per_file = _read_all_delete_files(self._fs, tasks) executor = ExecutorFactory.get_or_create() def _table_from_scan_task(task: FileScanTask) -> pa.Table: batches = list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file)) if len(batches) > 0: return pa.Table.from_batches(batches) else: return None futures = [ executor.submit( _table_from_scan_task, task, ) for task in tasks ] logger.info(f"Number of tasks: {len(tasks)} Number of Futures: {len(futures)}") total_row_count = 0 # for consistent ordering, we need to maintain future order futures_index = {f: i for i, f in enumerate(futures)} completed_futures: SortedList[Future[pa.Table]] = SortedList(iterable=[], key=lambda f: futures_index[f]) for future in concurrent.futures.as_completed(futures): completed_futures.add(future) if table_result := future.result(): total_row_count += len(table_result) # stop early if limit is satisfied if self._limit is not None and total_row_count >= self._limit: break # by now, we've either completed all tasks or satisfied the limit if self._limit is not None: _ = [f.cancel() for f in futures if not f.done()] tables = [f.result() for f in completed_futures if f.result()] if len(tables) < 1: return pa.Table.from_batches([], schema=schema_to_pyarrow(self._projected_schema, include_field_ids=False)) result = pa.concat_tables(tables, promote_options="permissive") if self._limit is not None: return result.slice(0, self._limit) logger.info(f"total_row_count: {total_row_count}, len(tables): {len(tables)} len(completed_futures): {len(completed_futures)}") logger.info([(i, t.num_rows) for i, t in enumerate(tables)]) logger.info([(i, t.file.file_path) for i, t in enumerate(tasks)]) return result Is the tasks variable in the to_table() function of the ArrowScan class non-deterministic? While debugging, I observed that applying the same row_filter to the same table sometimes results in a different number of tasks. In cases where data loss occurs, I noticed that the number of table_result objects retrieved via multiprocessing varies. total_row_count must be 100000, but at times it is between 97000 and 100000 2025-01-10 18:34:26 - pyiceberg.io.pyarrow - INFO - log init 2025-01-10 18:34:26 - pyiceberg.io - INFO - Loaded FileIO: pyiceberg.io.pyarrow.PyArrowFileIO 2025-01-10 18:36:02 - pyiceberg.io.pyarrow - INFO - total_row_count: 100000, len(completed_futures): 114 2025-01-10 18:36:02 - pyiceberg.io.pyarrow - INFO - [(0, 186), (1, 303), (2, 861), (3, 832), (4, 849), (5, 922), (6, 933), (7, 911), (8, 903), (9, 859), (10, 882), (11, 898), (12, 890), (13, 809), (14, 855), (15, 908), (16, 929), (17, 852), (18, 898), (19, 857), (20, 935), (21, 840), (22, 849), (23, 853), (24, 893), (25, 929), (26, 877), (27, 905), (28, 917), (29, 904), (30, 868), (31, 883), (32, 857), (33, 896), (34, 905), (35, 899), (36, 885), (37, 878), (38, 855), (39, 921), (40, 891), (41, 933), (42, 830), (43, 857), (44, 887), (45, 909), (46, 883), (47, 903), (48, 860), (49, 907), (50, 878), (51, 937), (52, 870), (53, 884), (54, 877), (55, 926), (56, 891), (57, 861), (58, 920), (59, 922), (60, 880), (61, 892), (62, 922), (63, 885), (64, 854), (65, 882), (66, 892), (67, 870), (68, 881), (69, 876), (70, 844), (71, 858), (72, 921), (73, 948), (74, 898), (75, 910), (76, 852), (77, 971), (78, 899), (79, 887), (80, 915), (81, 874), (82, 906), (83, 930), (84, 874), (85, 872), (86, 8 71), (87, 877), (88, 891), (89, 820), (90, 877), (91, 876), (92, 928), (93, 859), (94, 878), (95, 884), (96, 954), (97, 856), (98, 924), (99, 859), (100, 914), (101, 892), (102, 889), (103, 886), (104, 882), (105, 915), (106, 862), (107, 907), (108, 886), (109, 837), (110, 910), (111, 963), (112, 926), (113, 872)] # tables row index and count 2025-01-10 18:38:32 - pyiceberg.io.pyarrow - INFO - total_row_count: 99126, len(completed_futures): 116 2025-01-10 18:38:32 - pyiceberg.io.pyarrow - INFO - [(0, 186), (1, 303), (2, 861), (3, 832), (4, 849), (5, 922), (6, 933), (7, 911), (8, 903), (9, 859), (10, 882), (11, 898), (12, 890), (13, 809), (14, 855), (15, 908), (16, 929), (17, 852), (18, 898), (19, 857), (20, 935), (21, 840), (22, 849), (23, 853), (24, 893), (25, 929), (26, 877), (27, 905), (28, 917), (29, 904), (30, 868), (31, 883), (32, 857), (33, 896), (34, 905), (35, 899), (36, 885), (37, 878), (38, 855), (39, 921), (40, 891), (41, 933), (42, 830), (43, 857), (44, 887), (45, 909), (46, 883), (47, 903), (48, 860), (49, 907), (50, 878), (51, 937), (52, 870), (53, 884), (54, 877), (55, 926), (56, 891), (57, 861), (58, 920), (59, 922), (60, 880), (61, 892), (62, 922), (63, 885), (64, 854), (65, 882), (66, 892), (67, 870), (68, 881), (69, 876), (70, 844), (71, 858), (72, 921), (73, 948), (74, 898), (75, 910), (76, 852), (77, 971), (78, 899), (79, 887), (80, 915), (81, 906), (82, 930), (83, 874), (84, 872), (85, 871), (86, 8 77), (87, 891), (88, 820), (89, 877), (90, 876), (91, 928), (92, 859), (93, 878), (94, 884), (95, 954), (96, 856), (97, 924), (98, 859), (99, 914), (100, 892), (101, 889), (102, 886), (103, 882), (104, 915), (105, 862), (106, 907), (107, 886), (108, 837), (109, 910), (110, 963), (111, 926), (112, 872)] # tables row index and count 2025-01-10 19:12:18 - pyiceberg.io.pyarrow - INFO - log init 2025-01-10 19:12:18 - pyiceberg.io - INFO - Loaded FileIO: pyiceberg.io.pyarrow.PyArrowFileIO 2025-01-10 19:14:36 - pyiceberg.io.pyarrow - INFO - Number of tasks: 115 Number of Futures: 115 2025-01-10 19:16:26 - pyiceberg.io.pyarrow - INFO - total_row_count: 100000, len(completed_futures): 115 2025-01-10 19:16:26 - pyiceberg.io.pyarrow - INFO - [(0, 186), (1, 303), (2, 861), (3, 832), (4, 849), (5, 922), (6, 933), (7, 911), (8, 903), (9, 859), (10, 882), (11, 898), (12, 890), (13, 809), (14, 855), (15, 908), (16, 929), (17, 852), (18, 898), (19, 857), (20, 935), (21, 840), (22, 849), (23, 853), (24, 893), (25, 929), (26, 877), (27, 905), (28, 917), (29, 904), (30, 868), (31, 883), (32, 857), (33, 896), (34, 905), (35, 899), (36, 885), (37, 878), (38, 855), (39, 921), (40, 891), (41, 933), (42, 830), (43, 857), (44, 887), (45, 909), (46, 883), (47, 903), (48, 860), (49, 907), (50, 878), (51, 937), (52, 870), (53, 884), (54, 877), (55, 926), (56, 891), (57, 861), (58, 920), (59, 922), (60, 880), (61, 892), (62, 922), (63, 885), (64, 854), (65, 882), (66, 892), (67, 870), (68, 881), (69, 876), (70, 844), (71, 858), (72, 921), (73, 948), (74, 898), (75, 910), (76, 852), (77, 971), (78, 899), (79, 887), (80, 915), (81, 874), (82, 906), (83, 930), (84, 874), (85, 872), (86, 8 71), (87, 877), (88, 891), (89, 820), (90, 877), (91, 876), (92, 928), (93, 859), (94, 878), (95, 884), (96, 954), (97, 856), (98, 924), (99, 859), (100, 914), (101, 892), (102, 889), (103, 886), (104, 882), (105, 915), (106, 862), (107, 907), (108, 886), (109, 837), (110, 910), (111, 963), (112, 926), (113, 872)] # tables row index and count ``` ### Willingness to contribute - [ ] I can contribute a fix for this bug independently - [ ] I would be willing to contribute a fix for this bug with guidance from the Iceberg community - [ ] I cannot contribute a fix for this bug at this time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org