yihua commented on code in PR #18295:
URL: https://github.com/apache/hudi/pull/18295#discussion_r3055433023
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java:
##########
@@ -362,6 +387,99 @@ public static HoodieWriteConfig createMetadataWriteConfig(
return metadataWriteConfig;
}
+ /**
+ * Build the lock config for the metadata table by extracting lock-specific
properties from the
+ * data table's write config. This avoids copying all properties (which
would overwrite MDT-specific
+ * settings like base path and auto-clean).
+ */
+ private static HoodieLockConfig buildMetadataLockConfig(HoodieWriteConfig
writeConfig) {
+ TypedProperties props = writeConfig.getProps();
+ HoodieLockConfig.Builder lockConfigBuilder = HoodieLockConfig.newBuilder()
+ .withClientNumRetries(Integer.parseInt(props.getString(
+ HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.key(),
+ HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue())))
+ .withClientRetryWaitTimeInMillis(Long.parseLong(props.getString(
+
HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.key(),
+
HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue())))
+ .withLockWaitTimeInMillis(Long.valueOf(props.getString(
+ HoodieLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS.key(),
+
String.valueOf(HoodieLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS.defaultValue()))))
+ .withNumRetries(Integer.parseInt(props.getString(
+ HoodieLockConfig.LOCK_ACQUIRE_NUM_RETRIES.key(),
+ HoodieLockConfig.LOCK_ACQUIRE_NUM_RETRIES.defaultValue())))
+ .withRetryWaitTimeInMillis(Long.parseLong(props.getString(
+ HoodieLockConfig.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS.key(),
+
HoodieLockConfig.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS.defaultValue())))
+ .withRetryMaxWaitTimeInMillis(Long.parseLong(props.getString(
+ HoodieLockConfig.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS.key(),
+
HoodieLockConfig.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS.defaultValue())))
+ .withHeartbeatIntervalInMillis(Long.valueOf(props.getString(
+ HoodieLockConfig.LOCK_HEARTBEAT_INTERVAL_MS.key(),
+
String.valueOf(HoodieLockConfig.LOCK_HEARTBEAT_INTERVAL_MS.defaultValue()))));
+
+ String lockProviderClass = writeConfig.getLockProviderClass();
+ if (lockProviderClass == null) {
+ return lockConfigBuilder.build();
+ }
+
+ Properties providerProp = new Properties();
+ providerProp.setProperty(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
lockProviderClass);
+ lockConfigBuilder.fromProperties(providerProp);
+
+ if (ZookeeperBasedLockProvider.class.getName().equals(lockProviderClass)) {
+ if (props.containsKey(HoodieLockConfig.ZK_CONNECT_URL.key())) {
+
lockConfigBuilder.withZkQuorum(props.getString(HoodieLockConfig.ZK_CONNECT_URL.key()));
+ }
+ if (props.containsKey(HoodieLockConfig.ZK_BASE_PATH.key())) {
+
lockConfigBuilder.withZkBasePath(props.getString(HoodieLockConfig.ZK_BASE_PATH.key()));
+ }
+ if (props.containsKey(HoodieLockConfig.ZK_LOCK_KEY.key())) {
+
lockConfigBuilder.withZkLockKey(props.getString(HoodieLockConfig.ZK_LOCK_KEY.key()));
+ }
+ if (props.containsKey(HoodieLockConfig.ZK_PORT.key())) {
+
lockConfigBuilder.withZkPort(props.getString(HoodieLockConfig.ZK_PORT.key()));
+ }
+ if (props.containsKey(HoodieLockConfig.ZK_SESSION_TIMEOUT_MS.key())) {
+ lockConfigBuilder.withZkSessionTimeoutInMs(
+
Long.valueOf(props.getString(HoodieLockConfig.ZK_SESSION_TIMEOUT_MS.key())));
+ }
+ if (props.containsKey(HoodieLockConfig.ZK_CONNECTION_TIMEOUT_MS.key())) {
+ lockConfigBuilder.withZkConnectionTimeoutInMs(
+
Long.valueOf(props.getString(HoodieLockConfig.ZK_CONNECTION_TIMEOUT_MS.key())));
+ }
+ } else if
(FileSystemBasedLockProvider.class.getName().equals(lockProviderClass)) {
+ if (props.containsKey(HoodieLockConfig.FILESYSTEM_LOCK_PATH.key())) {
+
lockConfigBuilder.withFileSystemLockPath(props.getString(HoodieLockConfig.FILESYSTEM_LOCK_PATH.key()));
+ }
+ if (props.containsKey(HoodieLockConfig.FILESYSTEM_LOCK_EXPIRE.key())) {
+ lockConfigBuilder.withFileSystemLockExpire(
+
Integer.parseInt(props.getString(HoodieLockConfig.FILESYSTEM_LOCK_EXPIRE.key())));
+ }
+ } else if (lockProviderClass.contains("HiveMetastoreBasedLockProvider")) {
+ if (props.containsKey(HoodieLockConfig.HIVE_DATABASE_NAME.key())) {
+
lockConfigBuilder.withHiveDatabaseName(props.getString(HoodieLockConfig.HIVE_DATABASE_NAME.key()));
+ }
+ if (props.containsKey(HoodieLockConfig.HIVE_TABLE_NAME.key())) {
+
lockConfigBuilder.withHiveTableName(props.getString(HoodieLockConfig.HIVE_TABLE_NAME.key()));
+ }
+ if (props.containsKey(HoodieLockConfig.HIVE_METASTORE_URI.key())) {
+
lockConfigBuilder.withHiveMetastoreURIs(props.getString(HoodieLockConfig.HIVE_METASTORE_URI.key()));
+ }
+ } else {
+ // For any custom lock provider, pass through all lock-prefixed
properties
+ // so provider-specific configs are preserved.
Review Comment:
🤖 Good point. Beyond just the maintenance burden of enumerating providers,
an alternative approach could be to copy *all* properties with the
`hoodie.write.lock.` prefix from the data table config, rather than selecting
provider-specific prefixes. That way any new lock provider (DynamoDB,
storage-based, or future ones) would automatically get its config propagated.
The only thing to be careful about is not overwriting MDT-specific settings
like base path.
##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -81,6 +85,31 @@ public final class HoodieMetadataConfig extends HoodieConfig
{
+ "in streaming manner rather than two disjoint writes. By default "
+ "streaming writes to metadata table is enabled for SPARK engine
for incremental operations and disabled for all other cases.");
+ public static final ConfigProperty<String> METADATA_WRITE_CONCURRENCY_MODE =
ConfigProperty
Review Comment:
🤖 The config key uses `hoodie.metadata.write.concurrency.mode` (with a dot
after `metadata`), while the `METADATA_PREFIX` is typically `hoodie.metadata`.
Could you double-check the key construction — is the `.` after
`METADATA_PREFIX` intentional? If `METADATA_PREFIX` already ends without a dot
and you're concatenating `.write.concurrency.mode`, this would produce
`hoodie.metadata.write.concurrency.mode` which looks correct. Just want to make
sure the prefix convention is consistent with other keys in this file.
<sub><i>- Generated by an AI agent and may contain mistakes. Please verify
any suggestions before applying.</i></sub>
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java:
##########
@@ -249,6 +266,65 @@ public static HoodieLockConfig.Builder newBuilder() {
return new HoodieLockConfig.Builder();
}
+ /**
+ * Build a {@link HoodieLockConfig} by copying lock-related configs from the
given write config
+ * based on the configured lock provider. Only built-in lock providers are
supported.
+ *
+ * @param lockProviderClass the lock provider class name
+ * @param writeConfig the write config to copy lock properties from
+ * @return a new HoodieLockConfig with the relevant lock properties
+ * @throws HoodieException if the lock provider is not a built-in provider
+ */
+ public static HoodieLockConfig getLockConfigForBuiltInLockProvider(String
lockProviderClass, HoodieWriteConfig writeConfig) {
+ TypedProperties dataProps = writeConfig.getProps();
+ Properties lockProps = new Properties();
+ lockProps.put(LOCK_PROVIDER_CLASS_NAME.key(), lockProviderClass);
+
+ // Common lock configs used by all providers
+ lockProps.put(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS.key(),
+
writeConfig.getStringOrDefault(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS));
+ lockProps.put(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS.key(),
+
writeConfig.getStringOrDefault(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS));
+ lockProps.put(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.key(),
+
writeConfig.getStringOrDefault(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS));
+ lockProps.put(LOCK_ACQUIRE_NUM_RETRIES.key(),
+ writeConfig.getStringOrDefault(LOCK_ACQUIRE_NUM_RETRIES));
+ lockProps.put(LOCK_ACQUIRE_CLIENT_NUM_RETRIES.key(),
+ writeConfig.getStringOrDefault(LOCK_ACQUIRE_CLIENT_NUM_RETRIES));
+ lockProps.put(LOCK_ACQUIRE_WAIT_TIMEOUT_MS.key(),
+
String.valueOf(writeConfig.getIntOrDefault(LOCK_ACQUIRE_WAIT_TIMEOUT_MS)));
+ lockProps.put(LOCK_HEARTBEAT_INTERVAL_MS.key(),
+
String.valueOf(writeConfig.getIntOrDefault(LOCK_HEARTBEAT_INTERVAL_MS)));
+
+ // Provider-specific configs
+ if
(FileSystemBasedLockProvider.class.getCanonicalName().equals(lockProviderClass))
{
+ copyPropsWithPrefix(dataProps, lockProps,
LockConfiguration.FILESYSTEM_BASED_LOCK_PROPERTY_PREFIX);
+ } else if
(ZookeeperBasedLockProvider.class.getCanonicalName().equals(lockProviderClass)
Review Comment:
🤖 When `ZookeeperBasedImplicitBasePathLockProvider` is used, it derives its
ZK base path from `HoodieCommonConfig.BASE_PATH` and `HoodieTableConfig.NAME`
at runtime (not from any `hoodie.write.lock.zookeeper.*` property). Since the
MDT write config's base path is set to the metadata table path (e.g.
`<data_path>/.hoodie/metadata`), the MDT compaction writer will compute a
*different* ZK lock path than the data table writer — so the two will never
contend for the same lock, defeating the purpose.
Could you either (a) reject `ZookeeperBasedImplicitBasePathLockProvider` in
this method (similar to the `InProcessLockProvider` check), or (b) explicitly
inject the data table's base path/table name into the lock properties so the
MDT writer resolves to the same ZK path?
<sub><i>- Generated by an AI agent and may contain mistakes. Please verify
any suggestions before applying.</i></sub>
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java:
##########
@@ -249,6 +266,65 @@ public static HoodieLockConfig.Builder newBuilder() {
return new HoodieLockConfig.Builder();
}
+ /**
+ * Build a {@link HoodieLockConfig} by copying lock-related configs from the
given write config
+ * based on the configured lock provider. Only built-in lock providers are
supported.
+ *
+ * @param lockProviderClass the lock provider class name
+ * @param writeConfig the write config to copy lock properties from
+ * @return a new HoodieLockConfig with the relevant lock properties
+ * @throws HoodieException if the lock provider is not a built-in provider
+ */
+ public static HoodieLockConfig getLockConfigForBuiltInLockProvider(String
lockProviderClass, HoodieWriteConfig writeConfig) {
+ TypedProperties dataProps = writeConfig.getProps();
+ Properties lockProps = new Properties();
+ lockProps.put(LOCK_PROVIDER_CLASS_NAME.key(), lockProviderClass);
+
+ // Common lock configs used by all providers
+ lockProps.put(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS.key(),
+
writeConfig.getStringOrDefault(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS));
+ lockProps.put(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS.key(),
+
writeConfig.getStringOrDefault(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS));
+ lockProps.put(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.key(),
+
writeConfig.getStringOrDefault(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS));
+ lockProps.put(LOCK_ACQUIRE_NUM_RETRIES.key(),
+ writeConfig.getStringOrDefault(LOCK_ACQUIRE_NUM_RETRIES));
+ lockProps.put(LOCK_ACQUIRE_CLIENT_NUM_RETRIES.key(),
+ writeConfig.getStringOrDefault(LOCK_ACQUIRE_CLIENT_NUM_RETRIES));
+ lockProps.put(LOCK_ACQUIRE_WAIT_TIMEOUT_MS.key(),
+
String.valueOf(writeConfig.getIntOrDefault(LOCK_ACQUIRE_WAIT_TIMEOUT_MS)));
+ lockProps.put(LOCK_HEARTBEAT_INTERVAL_MS.key(),
+
String.valueOf(writeConfig.getIntOrDefault(LOCK_HEARTBEAT_INTERVAL_MS)));
+
+ // Provider-specific configs
+ if
(FileSystemBasedLockProvider.class.getCanonicalName().equals(lockProviderClass))
{
+ copyPropsWithPrefix(dataProps, lockProps,
LockConfiguration.FILESYSTEM_BASED_LOCK_PROPERTY_PREFIX);
+ } else if
(ZookeeperBasedLockProvider.class.getCanonicalName().equals(lockProviderClass)
+ ||
ZookeeperBasedImplicitBasePathLockProvider.class.getCanonicalName().equals(lockProviderClass))
{
+ copyPropsWithPrefix(dataProps, lockProps,
LockConfiguration.ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX);
+ } else if
(HIVE_METASTORE_BASED_LOCK_PROVIDER_CLASS.equals(lockProviderClass)) {
+ copyPropsWithPrefix(dataProps, lockProps,
LockConfiguration.HIVE_METASTORE_LOCK_PROPERTY_PREFIX);
+ } else if (DYNAMODB_BASED_LOCK_PROVIDER_CLASS.equals(lockProviderClass)
+ ||
DYNAMODB_BASED_IMPLICIT_PARTITION_KEY_LOCK_PROVIDER_CLASS.equals(lockProviderClass))
{
Review Comment:
🤖 Similar concern for `FileSystemBasedLockProvider` — if the user doesn't
explicitly set `hoodie.write.lock.filesystem.path`, the provider falls back to
`<base_path>/.hoodie/lock`. Since the MDT write config has a different base
path, the fallback lock file location would differ from the data table's. The
`copyPropsWithPrefix` would catch an explicitly set path, but the fallback case
silently breaks lock sharing. It might be worth adding a validation that
`FILESYSTEM_LOCK_PATH` is explicitly set when this provider is used for MDT
multi-writer.
<sub><i>- Generated by an AI agent and may contain mistakes. Please verify
any suggestions before applying.</i></sub>
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java:
##########
@@ -362,6 +387,99 @@ public static HoodieWriteConfig createMetadataWriteConfig(
return metadataWriteConfig;
}
+ /**
+ * Build the lock config for the metadata table by extracting lock-specific
properties from the
+ * data table's write config. This avoids copying all properties (which
would overwrite MDT-specific
+ * settings like base path and auto-clean).
+ */
+ private static HoodieLockConfig buildMetadataLockConfig(HoodieWriteConfig
writeConfig) {
+ TypedProperties props = writeConfig.getProps();
+ HoodieLockConfig.Builder lockConfigBuilder = HoodieLockConfig.newBuilder()
+ .withClientNumRetries(Integer.parseInt(props.getString(
+ HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.key(),
+ HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue())))
+ .withClientRetryWaitTimeInMillis(Long.parseLong(props.getString(
+
HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.key(),
+
HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue())))
+ .withLockWaitTimeInMillis(Long.valueOf(props.getString(
+ HoodieLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS.key(),
+
String.valueOf(HoodieLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS.defaultValue()))))
+ .withNumRetries(Integer.parseInt(props.getString(
+ HoodieLockConfig.LOCK_ACQUIRE_NUM_RETRIES.key(),
+ HoodieLockConfig.LOCK_ACQUIRE_NUM_RETRIES.defaultValue())))
+ .withRetryWaitTimeInMillis(Long.parseLong(props.getString(
+ HoodieLockConfig.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS.key(),
+
HoodieLockConfig.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS.defaultValue())))
+ .withRetryMaxWaitTimeInMillis(Long.parseLong(props.getString(
+ HoodieLockConfig.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS.key(),
+
HoodieLockConfig.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS.defaultValue())))
+ .withHeartbeatIntervalInMillis(Long.valueOf(props.getString(
+ HoodieLockConfig.LOCK_HEARTBEAT_INTERVAL_MS.key(),
+
String.valueOf(HoodieLockConfig.LOCK_HEARTBEAT_INTERVAL_MS.defaultValue()))));
+
+ String lockProviderClass = writeConfig.getLockProviderClass();
+ if (lockProviderClass == null) {
Review Comment:
🤖 Agreed — if multi-writer mode is explicitly configured on the MDT, the
data table *must* have a lock provider. Silently falling back to no-lock would
mask a misconfiguration that could lead to data corruption. Throwing here (or
at least a `checkState`) would be the right call.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]