lasdf1234 commented on PR #10517:
URL: https://github.com/apache/gravitino/pull/10517#issuecomment-4182377912

   ## Code Review
   
   This PR establishes a clean versioned architecture for the Flink connector 
and is a well-structured first step toward multi-version support. The 
three-layer module split (`flink-common` / `flink-1.18` / `flink-runtime-1.18`) 
and the `CatalogCompat` interface design are solid choices.
   
   I found several issues that should be addressed before merge:
   
   ---
   
   ### 🔴 Issue 1 — Dead parameter in `GravitinoIcebergCatalog` first constructor
   
   **File:** `GravitinoIcebergCatalog.java` line 43
   
   ```java
   protected GravitinoIcebergCatalog(
       ...,
       Map<String, String> icebergCatalogProperties,
       CatalogFactory.Context context) {   // ← never stored or read
   ```
   
   `context` is accepted but never used inside the constructor — the catalog is 
built solely from `catalogName` and `icebergCatalogProperties`. This dead 
parameter misleads callers into thinking the context affects construction.
   
   Either use it (e.g., pass a class loader) or remove it. The versioned path 
(`GravitinoIcebergCatalogFlink118`) already bypasses this constructor by using 
the second one that accepts a pre-built `AbstractCatalog`, so removing 
`context` here would not break the versioned path.
   
   ---
   
   ### 🔴 Issue 2 — Unsafe unchecked cast in `GravitinoIcebergCatalog` and 
`GravitinoIcebergCatalogFlink118`
   
   **File:** `GravitinoIcebergCatalog.java` lines 51–52; 
`GravitinoIcebergCatalogFlink118.java`
   
   ```java
   this.icebergCatalog =
       (AbstractCatalog) new FlinkCatalogFactory().createCatalog(catalogName, 
icebergCatalogProperties);
   ```
   
   `FlinkCatalogFactory.createCatalog()` returns `Catalog`. Casting it directly 
to `AbstractCatalog` without any guard will throw a silent `ClassCastException` 
if a future Iceberg version changes what the factory returns.
   
   Suggested fix:
   
   ```java
   Catalog created = new FlinkCatalogFactory().createCatalog(catalogName, 
icebergCatalogProperties);
   Preconditions.checkState(
       created instanceof AbstractCatalog,
       "Expected AbstractCatalog from FlinkCatalogFactory but got %s",
       created.getClass().getName());
   this.icebergCatalog = (AbstractCatalog) created;
   ```
   
   The same issue exists in 
`GravitinoIcebergCatalogFlink118.createIcebergCatalog()`.
   
   ---
   
   ### 🟡 Issue 3 — `CatalogCompatFlink118` duplicates `DefaultCatalogCompat` 
entirely
   
   **File:** `CatalogCompatFlink118.java`
   
   Both classes implement the exact same behavior: `CatalogTable.of(...)` and 
`CatalogPropertiesUtil.serializeCatalogTable(...)`. Since Flink 1.18 **is** the 
baseline version, there is no functional difference between them. This creates 
two classes to maintain for the same logic.
   
   Consider delegating to `DefaultCatalogCompat.INSTANCE` instead:
   
   ```java
   @Override
   public CatalogTable createCatalogTable(
       Schema schema, String comment, List<String> partitionKeys, Map<String, 
String> options) {
     return DefaultCatalogCompat.INSTANCE.createCatalogTable(schema, comment, 
partitionKeys, options);
   }
   
   @Override
   public Map<String, String> serializeCatalogTable(ResolvedCatalogTable 
resolvedTable) {
     return DefaultCatalogCompat.INSTANCE.serializeCatalogTable(resolvedTable);
   }
   ```
   
   `CatalogCompatFlink118` then becomes meaningful only when Flink 1.18 
genuinely diverges from the default baseline.
   
   ---
   
   ### 🟡 Issue 4 — Redundant `CatalogTableBuilder` functional interface in 
`FlinkGenericTableUtil`
   
   **File:** `FlinkGenericTableUtil.java` lines 108–115
   
   `CatalogTableBuilder.create(Schema, String, List<String>, Map<String, 
String>)` has the **exact same signature** as 
`CatalogCompat.createCatalogTable(...)`. This introduces two parallel extension 
points for the same operation.
   
   The only consumer of `CatalogTableBuilder` is `toFlinkGenericTable(Table, 
CatalogTableBuilder)`, which is called exclusively via 
`catalogCompat::createCatalogTable` — making `CatalogTableBuilder` pure 
indirection with no added value.
   
   Suggested simplification: remove `CatalogTableBuilder` entirely and inline 
the logic directly in the `CatalogCompat` overload. The three-overload chain
   
   ```
   toFlinkGenericTable(Table)
     → toFlinkGenericTable(Table, CatalogCompat)
     → toFlinkGenericTable(Table, CatalogTableBuilder)
   ```
   
   collapses to two clean overloads.
   
   ---
   
   ### 🟡 Issue 5 — `toIcebergCatalogOptions` silently overwrites user-provided 
`catalog-type`
   
   **File:** `GravitinoIcebergCatalogFactory.java` line 129
   
   ```java
   icebergCatalogOptions.put(CommonCatalogOptions.CATALOG_TYPE.key(), 
"iceberg");
   ```
   
   This unconditionally replaces any `catalog-type` value already present in 
the map (copied from `catalogOptions` on line 122). If a user explicitly set a 
different value, it is silently discarded with no log or warning.
   
   Consider at minimum a DEBUG-level log:
   
   ```java
   String existing = 
icebergCatalogOptions.put(CommonCatalogOptions.CATALOG_TYPE.key(), "iceberg");
   if (existing != null && !"iceberg".equals(existing)) {
       LOG.debug("Overriding catalog-type from '{}' to 'iceberg'", existing);
   }
   ```
   
   ---
   
   ### 🟢 Issue 6 — `discoverFactories(Iterator, Predicate, String)` should be 
`protected` and needs JavaDoc
   
   **File:** `GravitinoCatalogStore.java` line 161
   
   This method is package-private (no modifier), which is inconsistent with 
other new extension hooks introduced in this PR (e.g., `newCatalogDescriptor` 
is `protected`). If the intent is to allow versioned store subclasses to 
override factory discovery, it should be `protected`.
   
   Both `newCatalogDescriptor(String, Configuration)` (line 127) and 
`discoverFactories(Iterator, Predicate, String)` are new extension points with 
no JavaDoc. Protected APIs require JavaDoc per the project contribution 
guidelines.
   
   ---
   
   Overall the architecture is sound. Please address issues 1–4 before merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to