adamaps commented on issue #1746:
URL: 
https://github.com/apache/iceberg-python/issues/1746#issuecomment-3543232297

   In case it helps anyone in the meantime, here is a custom Class I am using 
to scan an Iceberg table with `BRANCH` or `TAG` names. In this example my table 
is stored on AWS Glue Catalog, and I'm only interested in scanning a named 
reference or the current snapshot.
   
   ```python
   from pyiceberg.catalog.glue import GlueCatalog
   from pyiceberg.table import DataScan, Table
   from pyiceberg.table.refs import SnapshotRefType
   
   class IcebergTable:
       """
       This class provides methods to load and scan an Iceberg table using 
refs, filters, and snapshots.
       """
   
       def __init__(
           self,
           database_name: str,
           table_name: str,
           row_filter: Optional[str] = None,
           ref_name: Optional[str] = None,
           ref_type: Optional[str] = None,
       ):
           """
           Initialize the IcebergTable class
               :param database_name: The name of the AWS Glue Catalog Iceberg 
database.
               :param table_name: The name of the AWS Glue Catalog Iceberg 
table.
               :param row_filter (optional): The filter string to apply to the 
table scan.
               :param ref_name (optional): The reference name of the snapshot 
to use.
               :param ref_type (optional): The reference type (BRANCH or TAG) 
to use.
           """
           self.catalog: GlueCatalog = GlueCatalog(
               name=AWS_PARAMS["GLUE_CATALOG"],
               warehouse=AWS_PARAMS["WAREHOUSE"],
               **{"client.region": AWS_PARAMS["AWS_REGION"]},
           )
           self.database_name = database_name
           self.table_name = table_name
           self.table = self.load_table()
           self.row_filter = row_filter
           self.ref_name = ref_name
           self.ref_type = ref_type
           self.snapshot_id = self.get_snapshot_id()
   
       def get_current_snapshot_id(self) -> int:
           """
           Get the current snapshot ID from the Table object.
               :return: The current snapshot ID.
           """
           current_snapshot = self.table.current_snapshot()
           if current_snapshot:
               return current_snapshot.snapshot_id
           raise ValueError(
               "Current snapshot is None. Cannot get ID from an invalid 
snapshot."
           )
   
       def get_snapshot_id(self) -> int:
           """
           Retrieve the snapshot_id based on the ref_name and ref_type, or use 
get_current_snapshot_id.
               :return: The snapshot_id associated with the given ref_name and 
ref_type.
           """
           if not self.ref_name:
               return self.get_current_snapshot_id()
           table_refs = self.get_table_refs()
           if self.ref_name in table_refs:
               return table_refs[self.ref_name]["snapshot_id"]
           raise ValueError(
               f"Reference name '{self.ref_name}' does not exist for reference 
type '{self.ref_type}'."
           )
   
       def get_table_refs(self) -> dict[str, dict]:
           """
           Get a dictionary of existing snapshot references.
               :return: A dict of snapshot IDs and their corresponding 
reference types.
           """
           table_refs = self.table.refs()
           return {
               ref_name: {
                   "snapshot_id": ref.snapshot_id,
                   "snapshot_ref_type": ref.snapshot_ref_type,
               }
               for ref_name, ref in table_refs.items()
               if self.ref_type is None
               or ref.snapshot_ref_type == SnapshotRefType[self.ref_type]
           }
   
       def load_table(self) -> Table:
           """
           Load an Iceberg table
               :return: The Iceberg Table object.
           """
           return 
self.catalog.load_table(f"{self.database_name}.{self.table_name}")
   
       def scan_table(self) -> DataScan:
           """
           Scan the Iceberg table object using the row_filter and snapshot_id.
               :return: Return a Table DataScan object.
           """
           if self.row_filter:
               return self.table.scan(
                   row_filter=self.row_filter, snapshot_id=self.snapshot_id
               )
           return self.table.scan(snapshot_id=self.snapshot_id)
   ```  
           
   ### Usage
   ```python
   # Create an IcebergTable object using input properties 
   # See: 
https://py.iceberg.apache.org/reference/pyiceberg/table/#pyiceberg.table.Table
   iceberg_table = IcebergTable(
       database_name = "db_test",
       table_name = "tb_test",
       row_filter = "id in (1,2,3)",
       ref_name = "branch_abc",
       ref_type = "BRANCH",
   )
   ```
   ```python
   # Create a DataScan object for downstream use (.to_arrow(), .to_duckdb() 
etc.)
   # See: 
https://py.iceberg.apache.org/reference/pyiceberg/table/#pyiceberg.table.DataScan
   iceberg_scan = iceberg_table.scan_table()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to