lasdf1234 commented on PR #10517:
URL: https://github.com/apache/gravitino/pull/10517#issuecomment-4182377912
## Code Review
This PR establishes a clean versioned architecture for the Flink connector
and is a well-structured first step toward multi-version support. The
three-layer module split (`flink-common` / `flink-1.18` / `flink-runtime-1.18`)
and the `CatalogCompat` interface design are solid choices.
I found several issues that should be addressed before merge:
---
### 🔴 Issue 1 — Dead parameter in `GravitinoIcebergCatalog` first constructor
**File:** `GravitinoIcebergCatalog.java` line 43
```java
protected GravitinoIcebergCatalog(
...,
Map<String, String> icebergCatalogProperties,
CatalogFactory.Context context) { // ← never stored or read
```
`context` is accepted but never used inside the constructor — the catalog is
built solely from `catalogName` and `icebergCatalogProperties`. This dead
parameter misleads callers into thinking the context affects construction.
Either use it (e.g., pass a class loader) or remove it. The versioned path
(`GravitinoIcebergCatalogFlink118`) already bypasses this constructor by using
the second one that accepts a pre-built `AbstractCatalog`, so removing
`context` here would not break the versioned path.
---
### 🔴 Issue 2 — Unsafe unchecked cast in `GravitinoIcebergCatalog` and
`GravitinoIcebergCatalogFlink118`
**File:** `GravitinoIcebergCatalog.java` lines 51–52;
`GravitinoIcebergCatalogFlink118.java`
```java
this.icebergCatalog =
(AbstractCatalog) new FlinkCatalogFactory().createCatalog(catalogName,
icebergCatalogProperties);
```
`FlinkCatalogFactory.createCatalog()` returns `Catalog`. Casting it directly
to `AbstractCatalog` without any guard will throw a silent `ClassCastException`
if a future Iceberg version changes what the factory returns.
Suggested fix:
```java
Catalog created = new FlinkCatalogFactory().createCatalog(catalogName,
icebergCatalogProperties);
Preconditions.checkState(
created instanceof AbstractCatalog,
"Expected AbstractCatalog from FlinkCatalogFactory but got %s",
created.getClass().getName());
this.icebergCatalog = (AbstractCatalog) created;
```
The same issue exists in
`GravitinoIcebergCatalogFlink118.createIcebergCatalog()`.
---
### 🟡 Issue 3 — `CatalogCompatFlink118` duplicates `DefaultCatalogCompat`
entirely
**File:** `CatalogCompatFlink118.java`
Both classes implement the exact same behavior: `CatalogTable.of(...)` and
`CatalogPropertiesUtil.serializeCatalogTable(...)`. Since Flink 1.18 **is** the
baseline version, there is no functional difference between them. This creates
two classes to maintain for the same logic.
Consider delegating to `DefaultCatalogCompat.INSTANCE` instead:
```java
@Override
public CatalogTable createCatalogTable(
Schema schema, String comment, List<String> partitionKeys, Map<String,
String> options) {
return DefaultCatalogCompat.INSTANCE.createCatalogTable(schema, comment,
partitionKeys, options);
}
@Override
public Map<String, String> serializeCatalogTable(ResolvedCatalogTable
resolvedTable) {
return DefaultCatalogCompat.INSTANCE.serializeCatalogTable(resolvedTable);
}
```
`CatalogCompatFlink118` then becomes meaningful only when Flink 1.18
genuinely diverges from the default baseline.
---
### 🟡 Issue 4 — Redundant `CatalogTableBuilder` functional interface in
`FlinkGenericTableUtil`
**File:** `FlinkGenericTableUtil.java` lines 108–115
`CatalogTableBuilder.create(Schema, String, List<String>, Map<String,
String>)` has the **exact same signature** as
`CatalogCompat.createCatalogTable(...)`. This introduces two parallel extension
points for the same operation.
The only consumer of `CatalogTableBuilder` is `toFlinkGenericTable(Table,
CatalogTableBuilder)`, which is called exclusively via
`catalogCompat::createCatalogTable` — making `CatalogTableBuilder` pure
indirection with no added value.
Suggested simplification: remove `CatalogTableBuilder` entirely and inline
the logic directly in the `CatalogCompat` overload. The three-overload chain
```
toFlinkGenericTable(Table)
→ toFlinkGenericTable(Table, CatalogCompat)
→ toFlinkGenericTable(Table, CatalogTableBuilder)
```
collapses to two clean overloads.
---
### 🟡 Issue 5 — `toIcebergCatalogOptions` silently overwrites user-provided
`catalog-type`
**File:** `GravitinoIcebergCatalogFactory.java` line 129
```java
icebergCatalogOptions.put(CommonCatalogOptions.CATALOG_TYPE.key(),
"iceberg");
```
This unconditionally replaces any `catalog-type` value already present in
the map (copied from `catalogOptions` on line 122). If a user explicitly set a
different value, it is silently discarded with no log or warning.
Consider at minimum a DEBUG-level log:
```java
String existing =
icebergCatalogOptions.put(CommonCatalogOptions.CATALOG_TYPE.key(), "iceberg");
if (existing != null && !"iceberg".equals(existing)) {
LOG.debug("Overriding catalog-type from '{}' to 'iceberg'", existing);
}
```
---
### 🟢 Issue 6 — `discoverFactories(Iterator, Predicate, String)` should be
`protected` and needs JavaDoc
**File:** `GravitinoCatalogStore.java` line 161
This method is package-private (no modifier), which is inconsistent with
other new extension hooks introduced in this PR (e.g., `newCatalogDescriptor`
is `protected`). If the intent is to allow versioned store subclasses to
override factory discovery, it should be `protected`.
Both `newCatalogDescriptor(String, Configuration)` (line 127) and
`discoverFactories(Iterator, Predicate, String)` are new extension points with
no JavaDoc. Protected APIs require JavaDoc per the project contribution
guidelines.
---
Overall the architecture is sound. Please address issues 1–4 before merge.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]