Copilot commented on code in PR #2924:
URL: https://github.com/apache/fluss/pull/2924#discussion_r2985326468


##########
fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java:
##########
@@ -154,6 +157,81 @@ public static void validateAlterTableProperties(
         }
     }
 
+    public static void validateCreateTableDataLakeConfig(
+            TableDescriptor tableDescriptor,
+            @Nullable DataLakeFormat clusterDataLakeFormat,
+            boolean clusterDataLakeTableEnabled) {
+        TableConfig tableConfig =
+                new 
TableConfig(Configuration.fromMap(tableDescriptor.getProperties()));
+        Optional<DataLakeFormat> tableDataLakeFormat = 
tableConfig.getDataLakeFormat();
+
+        validateTableDataLakeFormatMatchesCluster(tableDataLakeFormat, 
clusterDataLakeFormat, true);
+
+        if (tableConfig.isDataLakeEnabled() && !clusterDataLakeTableEnabled) {
+            throw new InvalidTableException(
+                    String.format(
+                            "'%s' is enabled for the table, but the Fluss 
cluster doesn't enable datalake tables.",
+                            ConfigOptions.TABLE_DATALAKE_ENABLED.key()));
+        }
+    }
+
+    public static void validateAlterTableDataLakeConfig(
+            TableDescriptor currentTableDescriptor,
+            TableDescriptor newTableDescriptor,
+            @Nullable DataLakeFormat clusterDataLakeFormat,
+            boolean clusterDataLakeTableEnabled) {
+        TableConfig currentConfig =
+                new 
TableConfig(Configuration.fromMap(currentTableDescriptor.getProperties()));
+        TableConfig newConfig =
+                new 
TableConfig(Configuration.fromMap(newTableDescriptor.getProperties()));
+
+        boolean enablingDataLake =
+                !currentConfig.isDataLakeEnabled() && 
newConfig.isDataLakeEnabled();
+        if (!enablingDataLake) {
+            return;
+        }
+
+        if (!clusterDataLakeTableEnabled) {
+            throw new InvalidAlterTableException(
+                    "Cannot alter table in data lake, because the Fluss 
cluster doesn't enable datalake tables.");
+        }
+
+        Optional<DataLakeFormat> tableDataLakeFormat = 
newConfig.getDataLakeFormat();
+        if (!tableDataLakeFormat.isPresent()) {
+            throw new InvalidAlterTableException(
+                    String.format(
+                            "The option '%s' cannot be altered because the 
table has no persisted '%s'.",

Review Comment:
   This new validation for enabling lakehouse on ALTER checks for missing 
`table.datalake.format`, but for tables created before any format was persisted 
the earlier `validateAlterTableProperties(...)` currently rejects *all* 
`table.datalake.*` changes first, so this more specific error path is 
unreachable (and tests expecting the new message will fail). Consider relaxing 
`validateAlterTableProperties` to allow `table.datalake.enabled` to be altered 
(while still rejecting other `table.datalake.*` keys) and rely on this method 
to provide the correct 'no persisted table.datalake.format' rejection.



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java:
##########
@@ -59,37 +62,91 @@ public LakeCatalogDynamicLoader(
 
     @Override
     public void validate(Configuration newConfig) throws ConfigException {
-        final DataLakeFormat newDatalakeFormat =
-                newConfig.getOptional(DATALAKE_FORMAT).isPresent()
-                        ? newConfig.get(DATALAKE_FORMAT)
-                        : currentConfiguration.get(DATALAKE_FORMAT);
+        Optional<Boolean> explicitDataLakeEnabled = 
newConfig.getOptional(DATALAKE_ENABLED);
+        Optional<DataLakeFormat> newDataLakeFormat = 
newConfig.getOptional(DATALAKE_FORMAT);
+
+        if (explicitDataLakeEnabled.isPresent() && 
!newDataLakeFormat.isPresent()) {
+            throw new ConfigException(
+                    String.format(
+                            "'%s' must be configured when '%s' is explicitly 
set.",
+                            DATALAKE_FORMAT.key(), DATALAKE_ENABLED.key()));
+        }
+
+        Optional<Boolean> currentExplicitDataLakeEnabled =
+                currentConfiguration.getOptional(DATALAKE_ENABLED);
+        DataLakeFormat currentDataLakeFormat =
+                currentConfiguration.getOptional(DATALAKE_FORMAT).orElse(null);
+        DataLakeFormat targetDataLakeFormat = newDataLakeFormat.orElse(null);
+        if ((currentExplicitDataLakeEnabled.isPresent() || 
explicitDataLakeEnabled.isPresent())
+                && currentDataLakeFormat != null
+                && targetDataLakeFormat != null
+                && currentDataLakeFormat != targetDataLakeFormat) {
+            throw new ConfigException(
+                    String.format(
+                            "'%s' cannot be changed from '%s' to '%s' once 
'%s' is explicitly configured.",
+                            DATALAKE_FORMAT.key(),
+                            currentDataLakeFormat,
+                            targetDataLakeFormat,
+                            DATALAKE_ENABLED.key()));
+        }
+
+        if (!newDataLakeFormat.isPresent()) {
+            DataLakeFormat currentDataLakeFormatForValidation =
+                    
currentConfiguration.getOptional(DATALAKE_FORMAT).orElse(null);
+            String currentDataLakePrefix =
+                    currentDataLakeFormatForValidation == null
+                            ? null
+                            : "datalake." + currentDataLakeFormatForValidation 
+ ".";
+            newConfig
+                    .toMap()
+                    .forEach(
+                            (key, value) -> {
+                                if (!key.equals(DATALAKE_FORMAT.key())
+                                        && !key.equals(DATALAKE_ENABLED.key())
+                                        && key.startsWith("datalake.")
+                                        && (currentDataLakePrefix == null
+                                                || 
!key.startsWith(currentDataLakePrefix))) {
+                                    throw new ConfigException(
+                                            String.format(
+                                                    "Invalid configuration 
'%s' because '%s' is not configured.",
+                                                    key, 
DATALAKE_FORMAT.key()));
+                                }
+                            });
+            return;
+        }
+
+        String datalakePrefix = "datalake." + newDataLakeFormat.get() + ".";
         Map<String, String> configMap = newConfig.toMap();
-        String datalakePrefix = "datalake." + newDatalakeFormat + ".";
         configMap.forEach(
                 (key, value) -> {
                     if (!key.equals(DATALAKE_FORMAT.key())
+                            && !key.equals(DATALAKE_ENABLED.key())
                             && key.startsWith("datalake.")
                             && !key.startsWith(datalakePrefix)) {
                         throw new ConfigException(
                                 String.format(
                                         "Invalid configuration '%s' for '%s' 
datalake format",
-                                        key, newDatalakeFormat));
+                                        key, newDataLakeFormat.get()));
                     }
                 });
     }
 
     @Override
     public void reconfigure(Configuration newConfig) throws ConfigException {
         LakeCatalogContainer lastLakeCatalogContainer = lakeCatalogContainer;
-        DataLakeFormat newLakeFormat = 
newConfig.getOptional(DATALAKE_FORMAT).orElse(null);
-        if (newLakeFormat != lastLakeCatalogContainer.dataLakeFormat) {
+        LakeCatalogContainer newLakeCatalogContainer =
+                new LakeCatalogContainer(newConfig, pluginManager, 
isCoordinator);
+        if (!Objects.equals(
+                        newLakeCatalogContainer.getDataLakeFormat(),
+                        lastLakeCatalogContainer.getDataLakeFormat())
+                || newLakeCatalogContainer.isClusterDataLakeTableEnabled()
+                        != 
lastLakeCatalogContainer.isClusterDataLakeTableEnabled()) {
             IOUtils.closeQuietly(
                     lastLakeCatalogContainer.lakeCatalog,
                     "Close lake catalog because config changes");
-            this.lakeCatalogContainer =
-                    new LakeCatalogContainer(newConfig, pluginManager, 
isCoordinator);
-            this.currentConfiguration = newConfig;
         }
+        this.lakeCatalogContainer = newLakeCatalogContainer;
+        this.currentConfiguration = newConfig;

Review Comment:
   In `reconfigure`, a new `LakeCatalogContainer` (and potentially a new 
`LakeCatalog`) is constructed unconditionally. If the effective lake 
mode/format doesn't change, the old catalog is never closed (but the reference 
is replaced), which can leak resources; and on unrelated dynamic config changes 
this can repeatedly instantiate heavy lake plugins/catalogs. Consider 
short-circuiting before constructing a new container/catalog (e.g., compare 
relevant config keys first), or make catalog creation lazy, and ensure the 
previous catalog is always closed whenever a new one is created/swap happens.



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java:
##########
@@ -130,17 +192,36 @@ public LakeCatalogContainer(
                 Configuration configuration,
                 @Nullable PluginManager pluginManager,
                 boolean isCoordinator) {
+            this.explicitDataLakeEnabledConfigured =
+                    configuration.getOptional(DATALAKE_ENABLED).isPresent();
+            this.clusterDataLakeTableEnabled = 
isClusterDataLakeTableEnabled(configuration);
             this.dataLakeFormat = 
configuration.getOptional(DATALAKE_FORMAT).orElse(null);

Review Comment:
   `datalake.enabled` is intended to require `datalake.format` when explicitly 
configured (both true/false), but that requirement is currently only enforced 
in `validate(...)` (dynamic updates). At startup, `LakeCatalogContainer` will 
accept `datalake.enabled=false` with no `datalake.format`, which contradicts 
the documented semantics and can leave the cluster in an inconsistent 
'pre-bind' state without a bound format. Consider enforcing this in the 
container constructor (or a shared validation path) by throwing 
`ConfigException` when `datalake.enabled` is present but `datalake.format` is 
missing.
   ```suggestion
               // Enforce that when datalake.enabled is explicitly configured 
(true or false),
               // a corresponding datalake.format must also be configured. This 
mirrors the
               // validation performed for dynamic updates to avoid 
inconsistent "pre-bind" states.
               Optional<Boolean> explicitDataLakeEnabled = 
configuration.getOptional(DATALAKE_ENABLED);
               Optional<DataLakeFormat> configuredDataLakeFormat =
                       configuration.getOptional(DATALAKE_FORMAT);
               if (explicitDataLakeEnabled.isPresent() && 
!configuredDataLakeFormat.isPresent()) {
                   throw new ConfigException(
                           String.format(
                                   "Configuration invalid: when '%s' is 
explicitly configured, '%s' must also be set.",
                                   DATALAKE_ENABLED.key(), 
DATALAKE_FORMAT.key()));
               }
   
               this.explicitDataLakeEnabledConfigured = 
explicitDataLakeEnabled.isPresent();
               this.clusterDataLakeTableEnabled = 
isClusterDataLakeTableEnabled(configuration);
               this.dataLakeFormat = configuredDataLakeFormat.orElse(null);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to