Copilot commented on code in PR #10475:
URL: https://github.com/apache/gravitino/pull/10475#discussion_r3115748864


##########
flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/paimon/TestGravitinoPaimonCatalog.java:
##########
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.flink.connector.paimon;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.flink.connector.PartitionConverter;
+import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
+import org.apache.gravitino.flink.connector.catalog.GravitinoCatalogManager;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.TableCatalog;
+import org.apache.paimon.flink.FlinkCatalog;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.flink.FlinkTableFactory;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedConstruction;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+/** Unit tests for {@link GravitinoPaimonCatalog}. */
+public class TestGravitinoPaimonCatalog {
+
+  private static final String CATALOG_NAME = "test-paimon-catalog";
+  private static final String TEST_DB = "testDb";
+  private static final String TEST_TABLE = "testTable";
+
+  private FlinkCatalog mockPaimonCatalog;
+  private GravitinoPaimonCatalog catalog;
+
+  @BeforeEach
+  void setUp() {
+    mockPaimonCatalog = Mockito.mock(FlinkCatalog.class);
+    CatalogFactory.Context mockContext = 
Mockito.mock(CatalogFactory.Context.class);
+    when(mockContext.getName()).thenReturn(CATALOG_NAME);
+    when(mockContext.getOptions())
+        .thenReturn(
+            ImmutableMap.of(
+                "type", GravitinoPaimonCatalogFactoryOptions.IDENTIFIER,
+                "warehouse", "file:///tmp/test_warehouse",
+                "metastore", "filesystem"));
+
+    try (MockedConstruction<FlinkCatalogFactory> ignored =
+        Mockito.mockConstruction(
+            FlinkCatalogFactory.class,
+            (mock, ctx) -> 
when(mock.createCatalog(any())).thenReturn(mockPaimonCatalog))) {
+      catalog =
+          new GravitinoPaimonCatalog(
+              mockContext,
+              "default",
+              Mockito.mock(SchemaAndTablePropertiesConverter.class),
+              Mockito.mock(PartitionConverter.class));
+    }
+  }
+
+  // Helper: wire up GravitinoCatalogManager and return the TableCatalog mock.
+  private TableCatalog setupGravitinoCatalogMock(
+      MockedStatic<GravitinoCatalogManager> mgrStatic, Catalog 
mockGravitinoCatalog) {
+    GravitinoCatalogManager mockMgr = 
Mockito.mock(GravitinoCatalogManager.class);
+    mgrStatic.when(GravitinoCatalogManager::get).thenReturn(mockMgr);
+    
when(mockMgr.getGravitinoCatalogInfo(CATALOG_NAME)).thenReturn(mockGravitinoCatalog);
+    TableCatalog mockTableCatalog = Mockito.mock(TableCatalog.class);
+    when(mockGravitinoCatalog.asTableCatalog()).thenReturn(mockTableCatalog);
+    return mockTableCatalog;
+  }
+
+  // ── realCatalog / getFactory ──────────────────────────────────────────────
+
+  @Test
+  public void testRealCatalogReturnsPaimonCatalog() {
+    Assertions.assertSame(mockPaimonCatalog, catalog.realCatalog());
+  }
+
+  @Test
+  public void testGetFactoryReturnsFlinkTableFactory() {
+    Assertions.assertTrue(catalog.getFactory().isPresent());
+    Assertions.assertInstanceOf(FlinkTableFactory.class, 
catalog.getFactory().get());
+  }
+
+  // ── dropTable ─────────────────────────────────────────────────────────────
+
+  @Test
+  public void testDropTableCallsPurgeOnGravitino() throws Exception {
+    ObjectPath tablePath = new ObjectPath(TEST_DB, TEST_TABLE);
+    NameIdentifier identifier = NameIdentifier.of(TEST_DB, TEST_TABLE);
+    Catalog mockGravitinoCatalog = Mockito.mock(Catalog.class);
+
+    try (MockedStatic<GravitinoCatalogManager> mgrStatic =
+        Mockito.mockStatic(GravitinoCatalogManager.class)) {
+      TableCatalog mockTableCatalog = setupGravitinoCatalogMock(mgrStatic, 
mockGravitinoCatalog);
+      when(mockTableCatalog.purgeTable(identifier)).thenReturn(true);
+
+      catalog.dropTable(tablePath, false);
+
+      verify(mockTableCatalog).purgeTable(identifier);
+      // Paimon is NOT touched directly; the Gravitino server handles 
metastore sync.
+      verify(mockPaimonCatalog, never()).dropTable(any(), 
Mockito.anyBoolean());
+    }
+  }
+
+  @Test
+  public void testDropTableThrowsWhenTableNotExistsAndIgnoreIsFalse() {
+    ObjectPath tablePath = new ObjectPath(TEST_DB, TEST_TABLE);
+    NameIdentifier identifier = NameIdentifier.of(TEST_DB, TEST_TABLE);
+    Catalog mockGravitinoCatalog = Mockito.mock(Catalog.class);
+
+    try (MockedStatic<GravitinoCatalogManager> mgrStatic =
+        Mockito.mockStatic(GravitinoCatalogManager.class)) {
+      TableCatalog mockTableCatalog = setupGravitinoCatalogMock(mgrStatic, 
mockGravitinoCatalog);
+      when(mockTableCatalog.purgeTable(identifier)).thenReturn(false);
+
+      Assertions.assertThrows(
+          TableNotExistException.class, () -> catalog.dropTable(tablePath, 
false));
+    }
+  }
+
+  @Test
+  public void testDropTableSucceedsWhenTableNotExistsAndIgnoreIsTrue() {
+    ObjectPath tablePath = new ObjectPath(TEST_DB, TEST_TABLE);
+    NameIdentifier identifier = NameIdentifier.of(TEST_DB, TEST_TABLE);
+    Catalog mockGravitinoCatalog = Mockito.mock(Catalog.class);
+
+    try (MockedStatic<GravitinoCatalogManager> mgrStatic =
+        Mockito.mockStatic(GravitinoCatalogManager.class)) {
+      TableCatalog mockTableCatalog = setupGravitinoCatalogMock(mgrStatic, 
mockGravitinoCatalog);
+      when(mockTableCatalog.purgeTable(identifier)).thenReturn(false);
+
+      // ignoreIfNotExists=true must suppress TableNotExistException.
+      Assertions.assertDoesNotThrow(() -> catalog.dropTable(tablePath, true));
+    }
+  }
+
+  // ── toFlinkTable (exercised via getTable) ─────────────────────────────────
+  //
+  // GravitinoPaimonCatalog overrides toFlinkTable() to return
+  // paimonCatalog.getTable(), which carries a proper CatalogEnvironment.
+  // BaseCatalog.getTable() enforces Gravitino authorization first, then

Review Comment:
   The newly introduced `GravitinoPaimonCatalog.alterTable(ObjectPath, 
CatalogBaseTable, boolean)` sync/invalidation behavior isn’t covered by unit 
tests here (e.g., verifying `paimonCatalog.alterTable(...)` is invoked after 
the Gravitino update, and that `ignoreIfNotExists=true` results in a true 
no-op). Adding a focused test for those paths would help prevent regressions, 
especially since the prior `TestPaimonPropertiesConverter` coverage was removed.



##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java:
##########
@@ -53,14 +62,97 @@ protected GravitinoPaimonCatalog(
         schemaAndTablePropertiesConverter,
         partitionConverter);
     FlinkCatalogFactory flinkCatalogFactory = new FlinkCatalogFactory();
-    this.paimonCatalog = flinkCatalogFactory.createCatalog(context);
+    this.paimonCatalog = 
flinkCatalogFactory.createCatalog(toPaimonContext(context));
+  }
+
+  /**
+   * Translates Gravitino catalog property names to their Paimon/Flink 
equivalents so that the
+   * underlying {@code FlinkCatalog} can be initialised correctly (e.g., 
{@code catalog-backend}
+   * becomes {@code metastore}).
+   */
+  private static CatalogFactory.Context toPaimonContext(CatalogFactory.Context 
context) {
+    Map<String, String> translatedOptions = new HashMap<>();
+    for (Map.Entry<String, String> entry : context.getOptions().entrySet()) {
+      String mappedKey =
+          
PaimonPropertiesConverter.INSTANCE.transformPropertyToFlinkCatalog(entry.getKey());
+      // Fall back to the original key when no mapping exists so Paimon-native 
properties are not
+      // silently dropped.
+      translatedOptions.put(mappedKey != null ? mappedKey : entry.getKey(), 
entry.getValue());
+    }
+    return new CatalogFactory.Context() {
+      @Override
+      public String getName() {
+        return context.getName();
+      }
+
+      @Override
+      public Map<String, String> getOptions() {
+        return translatedOptions;
+      }
+
+      @Override
+      public ReadableConfig getConfiguration() {
+        return context.getConfiguration();
+      }
+
+      @Override
+      public ClassLoader getClassLoader() {
+        return context.getClassLoader();
+      }
+    };
   }
 
   @Override
   protected AbstractCatalog realCatalog() {
     return paimonCatalog;
   }
 
+  @Override
+  public Optional<Factory> getFactory() {
+    return Optional.of(new FlinkTableFactory());
+  }
+
+  @Override
+  protected CatalogBaseTable toFlinkTable(Table table, ObjectPath tablePath) {
+    try {
+      return paimonCatalog.getTable(tablePath);
+    } catch (TableNotExistException e) {
+      // The table was confirmed to exist in Gravitino (auth passed in 
BaseCatalog.getTable).
+      // This branch indicates the two stores are out of sync.
+      throw new CatalogException(
+          "Table "
+              + tablePath
+              + " exists in Gravitino but not in Paimon/Hive metastore."
+              + " The two stores may be out of sync.",
+          e);
+    }
+  }
+
+  /**
+   * Overrides alterTable to also sync the change into {@code paimonCatalog}, 
which invalidates its
+   * internal table-metadata cache. Without this, {@link #toFlinkTable} 
delegates to {@code
+   * paimonCatalog.getTable()}, which may return stale cached metadata (e.g., 
the old comment) after
+   * Gravitino updates the Paimon backend through a separate catalog instance.
+   *
+   * <p>This variant is documented as comment-only, so the extra write to the 
Paimon Flink catalog
+   * is idempotent.
+   */
+  @Override
+  public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
+      throws TableNotExistException, CatalogException {

Review Comment:
   `alterTable(..., ignoreIfNotExists=true)` should be a no-op when the table 
does not exist in Gravitino, but this override always proceeds to call 
`paimonCatalog.alterTable(...)` even when `super.alterTable(...)` returns 
early. That can unintentionally modify the Paimon/Hive metastore in an 
out-of-sync state. Consider short-circuiting before the `super` call (e.g., `if 
(ignoreIfNotExists && !tableExists(tablePath)) return;`) so Paimon is only 
touched when the Gravitino alter actually happens.
   ```suggestion
         throws TableNotExistException, CatalogException {
       if (ignoreIfNotExists && !tableExists(tablePath)) {
         return;
       }
   ```



##########
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java:
##########
@@ -53,14 +62,97 @@ protected GravitinoPaimonCatalog(
         schemaAndTablePropertiesConverter,
         partitionConverter);
     FlinkCatalogFactory flinkCatalogFactory = new FlinkCatalogFactory();
-    this.paimonCatalog = flinkCatalogFactory.createCatalog(context);
+    this.paimonCatalog = 
flinkCatalogFactory.createCatalog(toPaimonContext(context));
+  }
+
+  /**
+   * Translates Gravitino catalog property names to their Paimon/Flink 
equivalents so that the
+   * underlying {@code FlinkCatalog} can be initialised correctly (e.g., 
{@code catalog-backend}
+   * becomes {@code metastore}).
+   */
+  private static CatalogFactory.Context toPaimonContext(CatalogFactory.Context 
context) {
+    Map<String, String> translatedOptions = new HashMap<>();
+    for (Map.Entry<String, String> entry : context.getOptions().entrySet()) {
+      String mappedKey =
+          
PaimonPropertiesConverter.INSTANCE.transformPropertyToFlinkCatalog(entry.getKey());
+      // Fall back to the original key when no mapping exists so Paimon-native 
properties are not
+      // silently dropped.
+      translatedOptions.put(mappedKey != null ? mappedKey : entry.getKey(), 
entry.getValue());
+    }
+    return new CatalogFactory.Context() {
+      @Override
+      public String getName() {
+        return context.getName();
+      }
+
+      @Override
+      public Map<String, String> getOptions() {
+        return translatedOptions;
+      }
+
+      @Override
+      public ReadableConfig getConfiguration() {
+        return context.getConfiguration();
+      }
+
+      @Override
+      public ClassLoader getClassLoader() {
+        return context.getClassLoader();
+      }
+    };
   }
 
   @Override
   protected AbstractCatalog realCatalog() {
     return paimonCatalog;
   }
 
+  @Override
+  public Optional<Factory> getFactory() {
+    return Optional.of(new FlinkTableFactory());
+  }
+
+  @Override
+  protected CatalogBaseTable toFlinkTable(Table table, ObjectPath tablePath) {
+    try {
+      return paimonCatalog.getTable(tablePath);
+    } catch (TableNotExistException e) {
+      // The table was confirmed to exist in Gravitino (auth passed in 
BaseCatalog.getTable).
+      // This branch indicates the two stores are out of sync.
+      throw new CatalogException(
+          "Table "
+              + tablePath
+              + " exists in Gravitino but not in Paimon/Hive metastore."
+              + " The two stores may be out of sync.",
+          e);
+    }
+  }
+
+  /**
+   * Overrides alterTable to also sync the change into {@code paimonCatalog}, 
which invalidates its
+   * internal table-metadata cache. Without this, {@link #toFlinkTable} 
delegates to {@code
+   * paimonCatalog.getTable()}, which may return stale cached metadata (e.g., 
the old comment) after
+   * Gravitino updates the Paimon backend through a separate catalog instance.
+   *
+   * <p>This variant is documented as comment-only, so the extra write to the 
Paimon Flink catalog
+   * is idempotent.
+   */
+  @Override
+  public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
+      throws TableNotExistException, CatalogException {
+    super.alterTable(tablePath, newTable, ignoreIfNotExists);
+    try {
+      paimonCatalog.alterTable(tablePath, newTable, ignoreIfNotExists);
+    } catch (TableNotExistException e) {
+      if (!ignoreIfNotExists) {
+        throw e;
+      }
+    } catch (Exception e) {
+      LOG.warn(
+          "Failed to sync paimonCatalog after alterTable for {}: {}", 
tablePath, e.getMessage());

Review Comment:
   Catching a generic `Exception` here and only logging `e.getMessage()` hides 
the root cause (no stack trace) and can silently reintroduce stale metadata if 
the sync/invalidation fails. Prefer catching the specific expected exceptions 
(e.g., `CatalogException`) and either rethrowing or at least logging the 
exception itself as the cause so failures are diagnosable.
   ```suggestion
       } catch (CatalogException e) {
         LOG.warn("Failed to sync paimonCatalog after alterTable for {}", 
tablePath, e);
         throw e;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to