Copilot commented on code in PR #2924:
URL: https://github.com/apache/fluss/pull/2924#discussion_r2985326468
##########
fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java:
##########
@@ -154,6 +157,81 @@ public static void validateAlterTableProperties(
}
}
+ public static void validateCreateTableDataLakeConfig(
+ TableDescriptor tableDescriptor,
+ @Nullable DataLakeFormat clusterDataLakeFormat,
+ boolean clusterDataLakeTableEnabled) {
+ TableConfig tableConfig =
+ new
TableConfig(Configuration.fromMap(tableDescriptor.getProperties()));
+ Optional<DataLakeFormat> tableDataLakeFormat =
tableConfig.getDataLakeFormat();
+
+ validateTableDataLakeFormatMatchesCluster(tableDataLakeFormat,
clusterDataLakeFormat, true);
+
+ if (tableConfig.isDataLakeEnabled() && !clusterDataLakeTableEnabled) {
+ throw new InvalidTableException(
+ String.format(
+ "'%s' is enabled for the table, but the Fluss
cluster doesn't enable datalake tables.",
+ ConfigOptions.TABLE_DATALAKE_ENABLED.key()));
+ }
+ }
+
+ public static void validateAlterTableDataLakeConfig(
+ TableDescriptor currentTableDescriptor,
+ TableDescriptor newTableDescriptor,
+ @Nullable DataLakeFormat clusterDataLakeFormat,
+ boolean clusterDataLakeTableEnabled) {
+ TableConfig currentConfig =
+ new
TableConfig(Configuration.fromMap(currentTableDescriptor.getProperties()));
+ TableConfig newConfig =
+ new
TableConfig(Configuration.fromMap(newTableDescriptor.getProperties()));
+
+ boolean enablingDataLake =
+ !currentConfig.isDataLakeEnabled() &&
newConfig.isDataLakeEnabled();
+ if (!enablingDataLake) {
+ return;
+ }
+
+ if (!clusterDataLakeTableEnabled) {
+ throw new InvalidAlterTableException(
+ "Cannot alter table in data lake, because the Fluss
cluster doesn't enable datalake tables.");
+ }
+
+ Optional<DataLakeFormat> tableDataLakeFormat =
newConfig.getDataLakeFormat();
+ if (!tableDataLakeFormat.isPresent()) {
+ throw new InvalidAlterTableException(
+ String.format(
+ "The option '%s' cannot be altered because the
table has no persisted '%s'.",
Review Comment:
This new validation for enabling lakehouse on ALTER checks for missing
`table.datalake.format`, but for tables created before any format was persisted
the earlier `validateAlterTableProperties(...)` currently rejects *all*
`table.datalake.*` changes first, so this more specific error path is
unreachable (and tests expecting the new message will fail). Consider relaxing
`validateAlterTableProperties` to allow `table.datalake.enabled` to be altered
(while still rejecting other `table.datalake.*` keys) and rely on this method
to provide the correct 'no persisted table.datalake.format' rejection.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java:
##########
@@ -59,37 +62,91 @@ public LakeCatalogDynamicLoader(
@Override
public void validate(Configuration newConfig) throws ConfigException {
- final DataLakeFormat newDatalakeFormat =
- newConfig.getOptional(DATALAKE_FORMAT).isPresent()
- ? newConfig.get(DATALAKE_FORMAT)
- : currentConfiguration.get(DATALAKE_FORMAT);
+ Optional<Boolean> explicitDataLakeEnabled =
newConfig.getOptional(DATALAKE_ENABLED);
+ Optional<DataLakeFormat> newDataLakeFormat =
newConfig.getOptional(DATALAKE_FORMAT);
+
+ if (explicitDataLakeEnabled.isPresent() &&
!newDataLakeFormat.isPresent()) {
+ throw new ConfigException(
+ String.format(
+ "'%s' must be configured when '%s' is explicitly
set.",
+ DATALAKE_FORMAT.key(), DATALAKE_ENABLED.key()));
+ }
+
+ Optional<Boolean> currentExplicitDataLakeEnabled =
+ currentConfiguration.getOptional(DATALAKE_ENABLED);
+ DataLakeFormat currentDataLakeFormat =
+ currentConfiguration.getOptional(DATALAKE_FORMAT).orElse(null);
+ DataLakeFormat targetDataLakeFormat = newDataLakeFormat.orElse(null);
+ if ((currentExplicitDataLakeEnabled.isPresent() ||
explicitDataLakeEnabled.isPresent())
+ && currentDataLakeFormat != null
+ && targetDataLakeFormat != null
+ && currentDataLakeFormat != targetDataLakeFormat) {
+ throw new ConfigException(
+ String.format(
+ "'%s' cannot be changed from '%s' to '%s' once
'%s' is explicitly configured.",
+ DATALAKE_FORMAT.key(),
+ currentDataLakeFormat,
+ targetDataLakeFormat,
+ DATALAKE_ENABLED.key()));
+ }
+
+ if (!newDataLakeFormat.isPresent()) {
+ DataLakeFormat currentDataLakeFormatForValidation =
+
currentConfiguration.getOptional(DATALAKE_FORMAT).orElse(null);
+ String currentDataLakePrefix =
+ currentDataLakeFormatForValidation == null
+ ? null
+ : "datalake." + currentDataLakeFormatForValidation
+ ".";
+ newConfig
+ .toMap()
+ .forEach(
+ (key, value) -> {
+ if (!key.equals(DATALAKE_FORMAT.key())
+ && !key.equals(DATALAKE_ENABLED.key())
+ && key.startsWith("datalake.")
+ && (currentDataLakePrefix == null
+ ||
!key.startsWith(currentDataLakePrefix))) {
+ throw new ConfigException(
+ String.format(
+ "Invalid configuration
'%s' because '%s' is not configured.",
+ key,
DATALAKE_FORMAT.key()));
+ }
+ });
+ return;
+ }
+
+ String datalakePrefix = "datalake." + newDataLakeFormat.get() + ".";
Map<String, String> configMap = newConfig.toMap();
- String datalakePrefix = "datalake." + newDatalakeFormat + ".";
configMap.forEach(
(key, value) -> {
if (!key.equals(DATALAKE_FORMAT.key())
+ && !key.equals(DATALAKE_ENABLED.key())
&& key.startsWith("datalake.")
&& !key.startsWith(datalakePrefix)) {
throw new ConfigException(
String.format(
"Invalid configuration '%s' for '%s'
datalake format",
- key, newDatalakeFormat));
+ key, newDataLakeFormat.get()));
}
});
}
@Override
public void reconfigure(Configuration newConfig) throws ConfigException {
LakeCatalogContainer lastLakeCatalogContainer = lakeCatalogContainer;
- DataLakeFormat newLakeFormat =
newConfig.getOptional(DATALAKE_FORMAT).orElse(null);
- if (newLakeFormat != lastLakeCatalogContainer.dataLakeFormat) {
+ LakeCatalogContainer newLakeCatalogContainer =
+ new LakeCatalogContainer(newConfig, pluginManager,
isCoordinator);
+ if (!Objects.equals(
+ newLakeCatalogContainer.getDataLakeFormat(),
+ lastLakeCatalogContainer.getDataLakeFormat())
+ || newLakeCatalogContainer.isClusterDataLakeTableEnabled()
+ !=
lastLakeCatalogContainer.isClusterDataLakeTableEnabled()) {
IOUtils.closeQuietly(
lastLakeCatalogContainer.lakeCatalog,
"Close lake catalog because config changes");
- this.lakeCatalogContainer =
- new LakeCatalogContainer(newConfig, pluginManager,
isCoordinator);
- this.currentConfiguration = newConfig;
}
+ this.lakeCatalogContainer = newLakeCatalogContainer;
+ this.currentConfiguration = newConfig;
Review Comment:
In `reconfigure`, a new `LakeCatalogContainer` (and potentially a new
`LakeCatalog`) is constructed unconditionally. If the effective lake
mode/format doesn't change, the old catalog is never closed (but the reference
is replaced), which can leak resources; and on unrelated dynamic config changes
this can repeatedly instantiate heavy lake plugins/catalogs. Consider
short-circuiting before constructing a new container/catalog (e.g., compare
relevant config keys first), or make catalog creation lazy, and ensure the
previous catalog is always closed whenever a new one is created/swap happens.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java:
##########
@@ -130,17 +192,36 @@ public LakeCatalogContainer(
Configuration configuration,
@Nullable PluginManager pluginManager,
boolean isCoordinator) {
+ this.explicitDataLakeEnabledConfigured =
+ configuration.getOptional(DATALAKE_ENABLED).isPresent();
+ this.clusterDataLakeTableEnabled =
isClusterDataLakeTableEnabled(configuration);
this.dataLakeFormat =
configuration.getOptional(DATALAKE_FORMAT).orElse(null);
Review Comment:
`datalake.enabled` is intended to require `datalake.format` when explicitly
configured (both true/false), but that requirement is currently only enforced
in `validate(...)` (dynamic updates). At startup, `LakeCatalogContainer` will
accept `datalake.enabled=false` with no `datalake.format`, which contradicts
the documented semantics and can leave the cluster in an inconsistent
'pre-bind' state without a bound format. Consider enforcing this in the
container constructor (or a shared validation path) by throwing
`ConfigException` when `datalake.enabled` is present but `datalake.format` is
missing.
```suggestion
// Enforce that when datalake.enabled is explicitly configured
(true or false),
// a corresponding datalake.format must also be configured. This
mirrors the
// validation performed for dynamic updates to avoid
inconsistent "pre-bind" states.
Optional<Boolean> explicitDataLakeEnabled =
configuration.getOptional(DATALAKE_ENABLED);
Optional<DataLakeFormat> configuredDataLakeFormat =
configuration.getOptional(DATALAKE_FORMAT);
if (explicitDataLakeEnabled.isPresent() &&
!configuredDataLakeFormat.isPresent()) {
throw new ConfigException(
String.format(
"Configuration invalid: when '%s' is
explicitly configured, '%s' must also be set.",
DATALAKE_ENABLED.key(),
DATALAKE_FORMAT.key()));
}
this.explicitDataLakeEnabledConfigured =
explicitDataLakeEnabled.isPresent();
this.clusterDataLakeTableEnabled =
isClusterDataLakeTableEnabled(configuration);
this.dataLakeFormat = configuredDataLakeFormat.orElse(null);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]