flyingImer commented on code in PR #3820:
URL: https://github.com/apache/polaris/pull/3820#discussion_r2887022157


##########
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PaimonHelper.java:
##########
@@ -81,13 +112,48 @@ public TableCatalog loadPaimonCatalog(PolarisSparkCatalog 
polarisSparkCatalog) {
           e);
     }
 
-    // Set the Polaris Spark Catalog as the delegate catalog of Paimon Catalog.
-    // This allows Paimon to use Polaris for metadata management while handling
-    // Paimon-specific operations like manifest and snapshot management.
-    if (this.paimonCatalog instanceof DelegatingCatalogExtension) {
-      ((DelegatingCatalogExtension) 
this.paimonCatalog).setDelegateCatalog(polarisSparkCatalog);
-    }
+    // Build options for Paimon initialization
+    Map<String, String> paimonOptions = new HashMap<>();
+    paimonOptions.put("warehouse", this.paimonWarehouse);
+
+    // Initialize Paimon catalog with the catalog name and options
+    ((CatalogPlugin) catalog)
+        .initialize(catalogName + "_paimon", new 
CaseInsensitiveStringMap(paimonOptions));
+    this.paimonCatalog = catalog;
 
     return this.paimonCatalog;
   }
+
+  /**
+   * Resolve the table location based on Paimon's warehouse convention.
+   *
+   * <p>Paimon stores tables at: warehouse/database.db/table_name
+   *
+   * @param namespace the namespace (database) of the table
+   * @param tableName the name of the table
+   * @return the resolved table location path
+   */
+  public String resolveTableLocation(String[] namespace, String tableName) {
+    // Paimon convention: warehouse/database.db/table_name
+    String dbPath = String.join(".", namespace) + ".db";
+    return this.paimonWarehouse + "/" + dbPath + "/" + tableName;
+  }

Review Comment:
   It seems to be us poking around Paimon internals, is there any Paimon 
offering alternatives? like `paimonTable.properties().get(PROP_LOCATION)`?



##########
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java:
##########
@@ -183,33 +234,78 @@ public Table createTable(
 
   @Override
   public Table alterTable(Identifier ident, TableChange... changes) throws 
NoSuchTableException {
+    // First, try to alter as an Iceberg table
     try {
       return this.icebergsSparkCatalog.alterTable(ident, changes);
     } catch (NoSuchTableException e) {
-      Table table = this.polarisSparkCatalog.loadTable(ident);
-      String provider = 
table.properties().get(PolarisCatalogUtils.TABLE_PROVIDER_KEY);
-      if (PolarisCatalogUtils.useDelta(provider)) {
-        // For delta table, most of the alter operations is a delta log 
manipulation,
-        // we load the delta catalog to help handling the alter table 
operation.
-        // NOTE: This currently doesn't work for changing file location and 
file format
-        //     using ALTER TABLE ...SET LOCATION, and ALTER TABLE ... SET 
FILEFORMAT.
-        TableCatalog deltaCatalog = 
deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog);
-        return deltaCatalog.alterTable(ident, changes);
-      } else if (PolarisCatalogUtils.useHudi(provider)) {
-        TableCatalog hudiCatalog = 
hudiHelper.loadHudiCatalog(this.polarisSparkCatalog);
-        return hudiCatalog.alterTable(ident, changes);
-      } else if (PolarisCatalogUtils.usePaimon(provider)) {
-        TableCatalog paimonCatalog = 
paimonHelper.loadPaimonCatalog(this.polarisSparkCatalog);
+      // Not an Iceberg table, fall through to handle as generic table
+    }
+
+    // For generic tables, check the format/provider to decide delegation
+    // Use getTableFormat to avoid triggering Spark DataSource resolution for 
routing decisions
+    String provider = this.polarisSparkCatalog.getTableFormat(ident);
+
+    // Delegate to the appropriate catalog based on the provider
+    if (PolarisCatalogUtils.useDelta(provider)) {
+      // For delta table, most of the alter operations is a delta log 
manipulation,
+      // we load the delta catalog to help handling the alter table operation.
+      // NOTE: This currently doesn't work for changing file location and file 
format
+      //     using ALTER TABLE ...SET LOCATION, and ALTER TABLE ... SET 
FILEFORMAT.
+      TableCatalog deltaCatalog = 
deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog);
+      return deltaCatalog.alterTable(ident, changes);
+    } else if (PolarisCatalogUtils.useHudi(provider)) {
+      TableCatalog hudiCatalog = 
hudiHelper.loadHudiCatalog(this.polarisSparkCatalog);
+      return hudiCatalog.alterTable(ident, changes);
+    } else if (PolarisCatalogUtils.usePaimon(provider)) {
+      try {
+        TableCatalog paimonCatalog = 
paimonHelper.loadPaimonCatalog(this.catalogName);
         return paimonCatalog.alterTable(ident, changes);
-      } else {
-        return this.polarisSparkCatalog.alterTable(ident);
+      } catch (NoSuchTableException paimonException) {
+        throw paimonException;
+      } catch (Exception paimonException) {
+        throw new RuntimeException(
+            String.format(
+                "Failed to alter Paimon table %s: %s", ident, 
paimonException.getMessage()),
+            paimonException);
       }
     }
+
+    // Default: use Polaris catalog for alter operation
+    return this.polarisSparkCatalog.alterTable(ident);
   }
 
   @Override
   public boolean dropTable(Identifier ident) {
-    return this.icebergsSparkCatalog.dropTable(ident) || 
this.polarisSparkCatalog.dropTable(ident);
+    // First, try to drop as an Iceberg table
+    if (this.icebergsSparkCatalog.dropTable(ident)) {
+      return true;
+    }
+
+    // For generic tables, check the provider to delegate to the appropriate 
catalog
+    // Use getTableFormat to avoid triggering Spark DataSource resolution for 
routing decisions
+    try {
+      String provider = this.polarisSparkCatalog.getTableFormat(ident);
+
+      if (PolarisCatalogUtils.usePaimon(provider)) {
+        // For Paimon tables, drop from both Polaris and Paimon catalog
+        boolean droppedFromPolaris = this.polarisSparkCatalog.dropTable(ident);
+        try {
+          TableCatalog paimonCatalog = 
paimonHelper.loadPaimonCatalog(this.catalogName);
+          paimonCatalog.dropTable(ident);
+        } catch (Exception e) {
+          LOG.warn("Failed to drop table {} from Paimon catalog: {}", ident, 
e.getMessage());

Review Comment:
   Current impl would make the table becomes an orphan if Polaris successfully 
drops but Paimon fails. We should do better on the failure mode here: 1) drop 
Paimon first then Polaris; 2) or drop Polaris first and roll back Polaris if 
Paimon drop fails



##########
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java:
##########
@@ -155,6 +185,32 @@ public Table createTable(
     String provider = properties.get(PolarisCatalogUtils.TABLE_PROVIDER_KEY);
     if (PolarisCatalogUtils.useIceberg(provider)) {
       return this.icebergsSparkCatalog.createTable(ident, schema, transforms, 
properties);
+    } else if (PolarisCatalogUtils.usePaimon(provider)) {
+      // For Paimon tables, use Paimon's SparkCatalog to create the table 
first,
+      // then register it in Polaris so it appears in the unified catalog view 
(SHOW TABLES).
+      // Paimon manages its own table paths at warehouse/database.db/table_name
+      // so we don't require a location to be specified.
+      TableCatalog paimonCatalog = 
paimonHelper.loadPaimonCatalog(this.catalogName);
+      // Ensure namespace exists in Paimon before creating table
+      paimonHelper.ensureNamespaceExists(ident.namespace());
+      Table paimonTable = paimonCatalog.createTable(ident, schema, transforms, 
properties);
+
+      // Register the table in Polaris as a generic table so it's visible via 
SHOW TABLES.
+      // Paimon table location follows the convention: 
warehouse/database.db/table_name
+      String tableLocation = 
paimonTable.properties().get(TableCatalog.PROP_LOCATION);
+      if (tableLocation == null) {
+        // Fallback: construct location from Paimon warehouse convention
+        tableLocation = paimonHelper.resolveTableLocation(ident.namespace(), 
ident.name());
+      }
+      Map<String, String> registrationProps = new 
java.util.HashMap<>(properties);
+      registrationProps.put(TableCatalog.PROP_LOCATION, tableLocation);
+      try {
+        this.polarisSparkCatalog.createTable(ident, schema, transforms, 
registrationProps);
+      } catch (Exception e) {
+        LOG.warn(
+            "Failed to register Paimon table {} in Polaris catalog: {}", 
ident, e.getMessage());
+      }

Review Comment:
   on second thought, I would highly recommend to 1) retry limited times if 
retryable; 2) roll back paimon table creation and rethrow if not auto 
recoverable



##########
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java:
##########
@@ -183,33 +234,78 @@ public Table createTable(
 
   @Override
   public Table alterTable(Identifier ident, TableChange... changes) throws 
NoSuchTableException {
+    // First, try to alter as an Iceberg table
     try {
       return this.icebergsSparkCatalog.alterTable(ident, changes);
     } catch (NoSuchTableException e) {
-      Table table = this.polarisSparkCatalog.loadTable(ident);
-      String provider = 
table.properties().get(PolarisCatalogUtils.TABLE_PROVIDER_KEY);
-      if (PolarisCatalogUtils.useDelta(provider)) {
-        // For delta table, most of the alter operations is a delta log 
manipulation,
-        // we load the delta catalog to help handling the alter table 
operation.
-        // NOTE: This currently doesn't work for changing file location and 
file format
-        //     using ALTER TABLE ...SET LOCATION, and ALTER TABLE ... SET 
FILEFORMAT.
-        TableCatalog deltaCatalog = 
deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog);
-        return deltaCatalog.alterTable(ident, changes);
-      } else if (PolarisCatalogUtils.useHudi(provider)) {
-        TableCatalog hudiCatalog = 
hudiHelper.loadHudiCatalog(this.polarisSparkCatalog);
-        return hudiCatalog.alterTable(ident, changes);
-      } else if (PolarisCatalogUtils.usePaimon(provider)) {
-        TableCatalog paimonCatalog = 
paimonHelper.loadPaimonCatalog(this.polarisSparkCatalog);
+      // Not an Iceberg table, fall through to handle as generic table
+    }
+
+    // For generic tables, check the format/provider to decide delegation
+    // Use getTableFormat to avoid triggering Spark DataSource resolution for 
routing decisions
+    String provider = this.polarisSparkCatalog.getTableFormat(ident);

Review Comment:
   info: it is not quite clear to me on why the pattern change from `String 
provider = table.properties().get(PolarisCatalogUtils.TABLE_PROVIDER_KEY)` to 
`String provider = this.polarisSparkCatalog.getTableFormat(ident)`, mind 
elaborating?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to