sririshindra commented on code in PR #3719:
URL: https://github.com/apache/polaris/pull/3719#discussion_r2891174611


##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -612,13 +615,133 @@ public LoadTableResponse createTableStaged(
    * @return ETagged {@link LoadTableResponse} to uniquely identify the table 
metadata
    */
   public LoadTableResponse registerTable(Namespace namespace, 
RegisterTableRequest request) {
-    PolarisAuthorizableOperation op = 
PolarisAuthorizableOperation.REGISTER_TABLE;
-    authorizeCreateTableLikeUnderNamespaceOperationOrThrow(
-        op, TableIdentifier.of(namespace, request.name()));
+    TableIdentifier identifier = TableIdentifier.of(namespace, request.name());
+    boolean overwrite = request.overwrite();
+
+    LOGGER.debug(
+        "registerTable: identifier={}, overwrite={}, request={}", identifier, 
overwrite, request);
 
+    if (overwrite) {
+      LOGGER.debug("registerTable: overwrite requested for {}", identifier);
+      authorizeRegisterTableOverwriteOrCreate(identifier);
+      return registerTableWithOverwrite(identifier, request);
+    }
+
+    // Creating new table requires REGISTER_TABLE privilege
+    PolarisAuthorizableOperation op = 
PolarisAuthorizableOperation.REGISTER_TABLE;
+    authorizeCreateTableLikeUnderNamespaceOperationOrThrow(op, identifier);
+    LOGGER.debug("registerTable: authorized REGISTER_TABLE for {}", 
identifier);
     return catalogHandlerUtils().registerTable(baseCatalog, namespace, 
request);
   }
 
+  /**
+   * Authorize registerTable with overwrite=true.
+   *
+   * <p>If the table exists, require REGISTER_TABLE_OVERWRITE on the table; 
otherwise require
+   * REGISTER_TABLE on the parent namespace.
+   *
+   * <p>Resolve both the namespace and an optional passthrough table path in 
one pass because the
+   * standard helpers either assume the table exists or always authorize 
against the namespace.
+   * Also, baseCatalog.tableExists() cannot be used here since 
initializeCatalog() has not run.
+   */
+  private void authorizeRegisterTableOverwriteOrCreate(TableIdentifier 
identifier) {
+    LOGGER.debug("authorizeRegisterTableOverwriteOrCreate: start for {}", 
identifier);
+    // Build a resolution manifest that includes the namespace and optional 
table path.
+    resolutionManifest = newResolutionManifest();
+    resolutionManifest.addPath(
+        new ResolverPath(
+            Arrays.asList(identifier.namespace().levels()), 
PolarisEntityType.NAMESPACE),
+        identifier.namespace());
+    resolutionManifest.addPassthroughPath(
+        new ResolverPath(
+            PolarisCatalogHelpers.tableIdentifierToList(identifier),
+            PolarisEntityType.TABLE_LIKE,
+            true /* optional */),
+        identifier);
+    resolutionManifest.resolveAll();
+    PolarisResolvedPathWrapper tableTarget =
+        resolutionManifest.getResolvedPath(
+            identifier, PolarisEntityType.TABLE_LIKE, 
PolarisEntitySubType.ICEBERG_TABLE, true);
+
+    if (tableTarget != null) {
+      LOGGER.debug(
+          "authorizeRegisterTableOverwriteOrCreate: found existing table 
target for {}, requiring REGISTER_TABLE_OVERWRITE",
+          identifier);
+      // Overwrite on an existing table requires full metadata permissions.
+      authorizer()
+          .authorizeOrThrow(
+              polarisPrincipal(),
+              resolutionManifest.getAllActivatedCatalogRoleAndPrincipalRoles(),
+              PolarisAuthorizableOperation.REGISTER_TABLE_OVERWRITE,
+              tableTarget,
+              null /* secondary */);
+      initializeCatalog();
+      LOGGER.debug(
+          "registerTable: overwrite=true, authorized for 
REGISTER_TABLE_OVERWRITE on existing table {}",
+          identifier);
+      return;
+    }
+
+    // Table doesn't exist, fall back to standard register-table authorization.

Review Comment:
   I agree with this. I simplified the logic so `overwrite=true` always 
requires `REGISTER_TABLE_OVERWRITE`; the fallback path was removed. That makes 
auth deterministic and independent of runtime table-existence state.



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java:
##########
@@ -330,6 +380,155 @@ public Table registerTable(TableIdentifier identifier, 
String metadataFileLocati
     return new BaseTable(ops, fullTableName(name(), identifier), 
metricsReporter());
   }
 
+  private Table overwriteRegisteredTable(
+      TableIdentifier identifier, String metadataFileLocation, String 
locationDir) {
+    LOGGER.debug(
+        "overwriteRegisteredTable called with identifier={}, 
metadataFileLocation={}, locationDir={}",
+        identifier,
+        metadataFileLocation,
+        locationDir);
+
+    /*
+     * High-level overview:
+     * - Resolve the authorized parent for the table so we know the storage 
context.
+     * - Validate and read the provided metadata file using a FileIO tied to 
that
+     *   storage context.
+     * - Ensure the target entity exists and is the correct table-like subtype.
+     * - Replace the stored entity properties (including metadata-location) 
with
+     *   values derived from the parsed TableMetadata and persist the change.
+     */
+
+    // Resolve the table path to obtain storage context for overwrite 
validation.
+    PolarisResolvedPathWrapper resolvedPath =
+        resolvedEntityView.getPassthroughResolvedPath(
+            identifier, PolarisEntityType.TABLE_LIKE, 
PolarisEntitySubType.ANY_SUBTYPE);
+    if (resolvedPath == null || resolvedPath.getRawLeafEntity() == null) {
+      LOGGER.debug("No resolved path or leaf entity found for identifier={}", 
identifier);
+      throw new NoSuchTableException("Table does not exist: %s", identifier);
+    }
+    LOGGER.debug(
+        "Resolved path for identifier={} -> leafEntityId={}, 
leafEntityName={}",
+        identifier,
+        resolvedPath.getRawLeafEntity().getId(),
+        resolvedPath.getRawLeafEntity().getName());
+
+    // Validate the supplied metadata file location against the resolved 
storage.
+    LOGGER.debug(
+        "Validating supplied metadataFileLocation={} against resolved storage 
for identifier={}",
+        metadataFileLocation,
+        identifier);
+    validateLocationForTableLike(identifier, metadataFileLocation, 
resolvedPath);
+
+    // Configure FileIO for the resolved storage and read the metadata file.
+    LOGGER.debug(
+        "Loading FileIO for identifier={} to read metadata at {}", identifier, 
locationDir);
+    FileIO fileIO =
+        loadFileIOForTableLike(
+            identifier,
+            Set.of(locationDir),
+            resolvedPath,
+            new HashMap<>(tableDefaultProperties),
+            Set.of(PolarisStorageActions.READ, PolarisStorageActions.LIST));
+
+    LOGGER.debug(
+        "Reading TableMetadata from metadataFileLocation={} using resolved 
FileIO",
+        metadataFileLocation);
+    TableMetadata metadata = TableMetadataParser.read(fileIO, 
metadataFileLocation);
+    LOGGER.debug(
+        "Parsed TableMetadata for identifier={} -> uuid={}, location={}",
+        identifier,
+        metadata.uuid(),
+        metadata.location());
+
+    LOGGER.debug(
+        "Validating metadata.location()={} for identifier={}", 
metadata.location(), identifier);
+    validateLocationForTableLike(identifier, metadata.location(), 
resolvedPath);
+    validateMetadataFileInTableDir(identifier, metadata);
+    LOGGER.debug(
+        "Metadata file validated to be within table directory for 
identifier={}", identifier);
+
+    PolarisResolvedPathWrapper resolvedStorageEntity = resolvedPath;

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to