manuzhang commented on code in PR #15240:
URL: https://github.com/apache/iceberg/pull/15240#discussion_r2869227473
##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java:
##########
@@ -99,96 +95,74 @@ public boolean supportsExternalMetadata() {
@Override
public Table getTable(StructType schema, Transform[] partitioning,
Map<String, String> options) {
- Spark3Util.CatalogAndIdentifier catalogIdentifier =
- catalogAndIdentifier(new CaseInsensitiveStringMap(options));
- CatalogPlugin catalog = catalogIdentifier.catalog();
- Identifier ident = catalogIdentifier.identifier();
+ return loadTable(new CaseInsensitiveStringMap(options));
+ }
+ private Table loadTable(CaseInsensitiveStringMap options) {
+ CatalogAndIdentifier catalogAndIdent = catalogAndIdentifier(options);
+ TableCatalog catalog = catalogAndIdent.tableCatalog();
+ Identifier ident = catalogAndIdent.identifier();
try {
- if (catalog instanceof TableCatalog) {
- return ((TableCatalog) catalog).loadTable(ident);
- }
+ return catalog.loadTable(ident);
} catch (NoSuchTableException e) {
- // throwing an iceberg NoSuchTableException because the Spark one is
typed and can't be thrown
- // from this interface
+ // TableProvider doesn't permit typed exception while loading tables,
+ // so throw Iceberg NoSuchTableException because the Spark one is typed
throw new org.apache.iceberg.exceptions.NoSuchTableException(
- e, "Cannot find table for %s.", ident);
+ e,
+ "Cannot find table %s in catalog %s (%s)",
+ ident,
+ catalog.name(),
+ catalog.getClass().getName());
}
-
- // throwing an iceberg NoSuchTableException because the Spark one is typed
and can't be thrown
- // from this interface
- throw new org.apache.iceberg.exceptions.NoSuchTableException(
- "Cannot find table for %s.", ident);
}
- private Spark3Util.CatalogAndIdentifier
catalogAndIdentifier(CaseInsensitiveStringMap options) {
+ private CatalogAndIdentifier catalogAndIdentifier(CaseInsensitiveStringMap
options) {
Preconditions.checkArgument(
options.containsKey(SparkReadOptions.PATH), "Cannot open table: path
is not set");
+ Spark3Util.validateNoLegacyTimeTravel(options);
+
SparkSession spark = SparkSession.active();
+ CatalogManager catalogManager = spark.sessionState().catalogManager();
+
setupDefaultSparkCatalogs(spark);
- String path = options.get(SparkReadOptions.PATH);
- Long snapshotId = propertyAsLong(options, SparkReadOptions.SNAPSHOT_ID);
- Long asOfTimestamp = propertyAsLong(options,
SparkReadOptions.AS_OF_TIMESTAMP);
+ String path = options.get(SparkReadOptions.PATH);
String branch = options.get(SparkReadOptions.BRANCH);
- String tag = options.get(SparkReadOptions.TAG);
- Preconditions.checkArgument(
- Stream.of(snapshotId, asOfTimestamp, branch,
tag).filter(Objects::nonNull).count() <= 1,
- "Can specify only one of snapshot-id (%s), as-of-timestamp (%s),
branch (%s), tag (%s)",
- snapshotId,
- asOfTimestamp,
- branch,
- tag);
-
- String selector = null;
-
- if (snapshotId != null) {
- selector = SNAPSHOT_ID + snapshotId;
- }
-
- if (asOfTimestamp != null) {
- selector = AT_TIMESTAMP + asOfTimestamp;
- }
-
- if (branch != null) {
- selector = BRANCH_PREFIX + branch;
- }
-
- if (tag != null) {
- selector = TAG_PREFIX + tag;
- }
-
- CatalogManager catalogManager = spark.sessionState().catalogManager();
+ String branchSelector = branch != null ? BRANCH_PREFIX + branch : null;
// return rewrite catalog with path as group ID if table is staged for
rewrite
if (TABLE_CACHE.contains(path)) {
- return new Spark3Util.CatalogAndIdentifier(
+ return new CatalogAndIdentifier(
catalogManager.catalog(REWRITE_CATALOG_NAME),
Identifier.of(EMPTY_NAMESPACE, path));
}
+ // return default catalog and PathIdentifier with branch selector for a
path
if (path.contains("/")) {
- // contains a path. Return iceberg default catalog and a PathIdentifier
- return new Spark3Util.CatalogAndIdentifier(
+ return new CatalogAndIdentifier(
catalogManager.catalog(DEFAULT_CATALOG_NAME),
- new PathIdentifier(pathWithSelector(path, selector)));
+ new PathIdentifier(pathWithSelector(path, branchSelector)));
}
- final Spark3Util.CatalogAndIdentifier catalogAndIdentifier =
- Spark3Util.catalogAndIdentifier("path or identifier", spark, path);
+ // treat path as an identifier and resolve it against the session config
+ // if the catalog resolves to an unknown session catalog, use the default
Iceberg catalog
+ CatalogAndIdentifier catalogAndIdent = resolveIdentifier(spark, path);
+ CatalogPlugin catalog = catalogAndIdent.catalog();
+ Identifier ident = catalogAndIdent.identifier();
+ return new CatalogAndIdentifier(
+ isUnknownSessionCatalog(catalog) ?
catalogManager.catalog(DEFAULT_CATALOG_NAME) : catalog,
+ identifierWithSelector(ident, branchSelector));
+ }
- Identifier ident =
identifierWithSelector(catalogAndIdentifier.identifier(), selector);
- if (catalogAndIdentifier.catalog().name().equals("spark_catalog")
- && !(catalogAndIdentifier.catalog() instanceof SparkSessionCatalog)) {
- // catalog is a session catalog but does not support Iceberg. Use
Iceberg instead.
- return new Spark3Util.CatalogAndIdentifier(
- catalogManager.catalog(DEFAULT_CATALOG_NAME), ident);
- } else {
- return new
Spark3Util.CatalogAndIdentifier(catalogAndIdentifier.catalog(), ident);
- }
+ private static CatalogAndIdentifier resolveIdentifier(SparkSession spark,
String ident) {
+ return Spark3Util.catalogAndIdentifier("identifier", spark, ident);
+ }
+
+ private static boolean isUnknownSessionCatalog(CatalogPlugin catalog) {
+ return catalog.name().equals("spark_catalog") && !(catalog instanceof
SparkSessionCatalog);
Review Comment:
Can we use `CatalogManager.SESSION_CATALOG_NAME()`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]