adamaps commented on issue #1746:
URL:
https://github.com/apache/iceberg-python/issues/1746#issuecomment-3543232297
In case it helps anyone in the meantime, here is a custom Class I am using
to scan an Iceberg table with `BRANCH` or `TAG` names. In this example my table
is stored on AWS Glue Catalog, and I'm only interested in scanning a named
reference or the current snapshot.
```python
from pyiceberg.catalog.glue import GlueCatalog
from pyiceberg.table import DataScan, Table
from pyiceberg.table.refs import SnapshotRefType
class IcebergTable:
"""
This class provides methods to load and scan an Iceberg table using
refs, filters, and snapshots.
"""
def __init__(
self,
database_name: str,
table_name: str,
row_filter: Optional[str] = None,
ref_name: Optional[str] = None,
ref_type: Optional[str] = None,
):
"""
Initialize the IcebergTable class
:param database_name: The name of the AWS Glue Catalog Iceberg
database.
:param table_name: The name of the AWS Glue Catalog Iceberg
table.
:param row_filter (optional): The filter string to apply to the
table scan.
:param ref_name (optional): The reference name of the snapshot
to use.
:param ref_type (optional): The reference type (BRANCH or TAG)
to use.
"""
self.catalog: GlueCatalog = GlueCatalog(
name=AWS_PARAMS["GLUE_CATALOG"],
warehouse=AWS_PARAMS["WAREHOUSE"],
**{"client.region": AWS_PARAMS["AWS_REGION"]},
)
self.database_name = database_name
self.table_name = table_name
self.table = self.load_table()
self.row_filter = row_filter
self.ref_name = ref_name
self.ref_type = ref_type
self.snapshot_id = self.get_snapshot_id()
def get_current_snapshot_id(self) -> int:
"""
Get the current snapshot ID from the Table object.
:return: The current snapshot ID.
"""
current_snapshot = self.table.current_snapshot()
if current_snapshot:
return current_snapshot.snapshot_id
raise ValueError(
"Current snapshot is None. Cannot get ID from an invalid
snapshot."
)
def get_snapshot_id(self) -> int:
"""
Retrieve the snapshot_id based on the ref_name and ref_type, or use
get_current_snapshot_id.
:return: The snapshot_id associated with the given ref_name and
ref_type.
"""
if not self.ref_name:
return self.get_current_snapshot_id()
table_refs = self.get_table_refs()
if self.ref_name in table_refs:
return table_refs[self.ref_name]["snapshot_id"]
raise ValueError(
f"Reference name '{self.ref_name}' does not exist for reference
type '{self.ref_type}'."
)
def get_table_refs(self) -> dict[str, dict]:
"""
Get a dictionary of existing snapshot references.
:return: A dict of snapshot IDs and their corresponding
reference types.
"""
table_refs = self.table.refs()
return {
ref_name: {
"snapshot_id": ref.snapshot_id,
"snapshot_ref_type": ref.snapshot_ref_type,
}
for ref_name, ref in table_refs.items()
if self.ref_type is None
or ref.snapshot_ref_type == SnapshotRefType[self.ref_type]
}
def load_table(self) -> Table:
"""
Load an Iceberg table
:return: The Iceberg Table object.
"""
return
self.catalog.load_table(f"{self.database_name}.{self.table_name}")
def scan_table(self) -> DataScan:
"""
Scan the Iceberg table object using the row_filter and snapshot_id.
:return: Return a Table DataScan object.
"""
if self.row_filter:
return self.table.scan(
row_filter=self.row_filter, snapshot_id=self.snapshot_id
)
return self.table.scan(snapshot_id=self.snapshot_id)
```
### Usage
```python
# Create an IcebergTable object using input properties
# See:
https://py.iceberg.apache.org/reference/pyiceberg/table/#pyiceberg.table.Table
iceberg_table = IcebergTable(
database_name = "db_test",
table_name = "tb_test",
row_filter = "id in (1,2,3)",
ref_name = "branch_abc",
ref_type = "BRANCH",
)
```
```python
# Create a DataScan object for downstream use (.to_arrow(), .to_duckdb()
etc.)
# See:
https://py.iceberg.apache.org/reference/pyiceberg/table/#pyiceberg.table.DataScan
iceberg_scan = iceberg_table.scan_table()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]