yihua commented on code in PR #18295:
URL: https://github.com/apache/hudi/pull/18295#discussion_r3055433023


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java:
##########
@@ -362,6 +387,99 @@ public static HoodieWriteConfig createMetadataWriteConfig(
     return metadataWriteConfig;
   }
 
+  /**
+   * Build the lock config for the metadata table by extracting lock-specific 
properties from the
+   * data table's write config. This avoids copying all properties (which 
would overwrite MDT-specific
+   * settings like base path and auto-clean).
+   */
+  private static HoodieLockConfig buildMetadataLockConfig(HoodieWriteConfig 
writeConfig) {
+    TypedProperties props = writeConfig.getProps();
+    HoodieLockConfig.Builder lockConfigBuilder = HoodieLockConfig.newBuilder()
+        .withClientNumRetries(Integer.parseInt(props.getString(
+            HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.key(),
+            HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue())))
+        .withClientRetryWaitTimeInMillis(Long.parseLong(props.getString(
+            
HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.key(),
+            
HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue())))
+        .withLockWaitTimeInMillis(Long.valueOf(props.getString(
+            HoodieLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS.key(),
+            
String.valueOf(HoodieLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS.defaultValue()))))
+        .withNumRetries(Integer.parseInt(props.getString(
+            HoodieLockConfig.LOCK_ACQUIRE_NUM_RETRIES.key(),
+            HoodieLockConfig.LOCK_ACQUIRE_NUM_RETRIES.defaultValue())))
+        .withRetryWaitTimeInMillis(Long.parseLong(props.getString(
+            HoodieLockConfig.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS.key(),
+            
HoodieLockConfig.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS.defaultValue())))
+        .withRetryMaxWaitTimeInMillis(Long.parseLong(props.getString(
+            HoodieLockConfig.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS.key(),
+            
HoodieLockConfig.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS.defaultValue())))
+        .withHeartbeatIntervalInMillis(Long.valueOf(props.getString(
+            HoodieLockConfig.LOCK_HEARTBEAT_INTERVAL_MS.key(),
+            
String.valueOf(HoodieLockConfig.LOCK_HEARTBEAT_INTERVAL_MS.defaultValue()))));
+
+    String lockProviderClass = writeConfig.getLockProviderClass();
+    if (lockProviderClass == null) {
+      return lockConfigBuilder.build();
+    }
+
+    Properties providerProp = new Properties();
+    providerProp.setProperty(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), 
lockProviderClass);
+    lockConfigBuilder.fromProperties(providerProp);
+
+    if (ZookeeperBasedLockProvider.class.getName().equals(lockProviderClass)) {
+      if (props.containsKey(HoodieLockConfig.ZK_CONNECT_URL.key())) {
+        
lockConfigBuilder.withZkQuorum(props.getString(HoodieLockConfig.ZK_CONNECT_URL.key()));
+      }
+      if (props.containsKey(HoodieLockConfig.ZK_BASE_PATH.key())) {
+        
lockConfigBuilder.withZkBasePath(props.getString(HoodieLockConfig.ZK_BASE_PATH.key()));
+      }
+      if (props.containsKey(HoodieLockConfig.ZK_LOCK_KEY.key())) {
+        
lockConfigBuilder.withZkLockKey(props.getString(HoodieLockConfig.ZK_LOCK_KEY.key()));
+      }
+      if (props.containsKey(HoodieLockConfig.ZK_PORT.key())) {
+        
lockConfigBuilder.withZkPort(props.getString(HoodieLockConfig.ZK_PORT.key()));
+      }
+      if (props.containsKey(HoodieLockConfig.ZK_SESSION_TIMEOUT_MS.key())) {
+        lockConfigBuilder.withZkSessionTimeoutInMs(
+            
Long.valueOf(props.getString(HoodieLockConfig.ZK_SESSION_TIMEOUT_MS.key())));
+      }
+      if (props.containsKey(HoodieLockConfig.ZK_CONNECTION_TIMEOUT_MS.key())) {
+        lockConfigBuilder.withZkConnectionTimeoutInMs(
+            
Long.valueOf(props.getString(HoodieLockConfig.ZK_CONNECTION_TIMEOUT_MS.key())));
+      }
+    } else if 
(FileSystemBasedLockProvider.class.getName().equals(lockProviderClass)) {
+      if (props.containsKey(HoodieLockConfig.FILESYSTEM_LOCK_PATH.key())) {
+        
lockConfigBuilder.withFileSystemLockPath(props.getString(HoodieLockConfig.FILESYSTEM_LOCK_PATH.key()));
+      }
+      if (props.containsKey(HoodieLockConfig.FILESYSTEM_LOCK_EXPIRE.key())) {
+        lockConfigBuilder.withFileSystemLockExpire(
+            
Integer.parseInt(props.getString(HoodieLockConfig.FILESYSTEM_LOCK_EXPIRE.key())));
+      }
+    } else if (lockProviderClass.contains("HiveMetastoreBasedLockProvider")) {
+      if (props.containsKey(HoodieLockConfig.HIVE_DATABASE_NAME.key())) {
+        
lockConfigBuilder.withHiveDatabaseName(props.getString(HoodieLockConfig.HIVE_DATABASE_NAME.key()));
+      }
+      if (props.containsKey(HoodieLockConfig.HIVE_TABLE_NAME.key())) {
+        
lockConfigBuilder.withHiveTableName(props.getString(HoodieLockConfig.HIVE_TABLE_NAME.key()));
+      }
+      if (props.containsKey(HoodieLockConfig.HIVE_METASTORE_URI.key())) {
+        
lockConfigBuilder.withHiveMetastoreURIs(props.getString(HoodieLockConfig.HIVE_METASTORE_URI.key()));
+      }
+    } else {
+      // For any custom lock provider, pass through all lock-prefixed 
properties
+      // so provider-specific configs are preserved.

Review Comment:
   🤖 Good point. Beyond just the maintenance burden of enumerating providers, 
an alternative approach could be to copy *all* properties with the 
`hoodie.write.lock.` prefix from the data table config, rather than selecting 
provider-specific prefixes. That way any new lock provider (DynamoDB, 
storage-based, or future ones) would automatically get its config propagated. 
The only thing to be careful about is not overwriting MDT-specific settings 
like base path.



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -81,6 +85,31 @@ public final class HoodieMetadataConfig extends HoodieConfig 
{
           + "in streaming manner rather than two disjoint writes. By default "
           + "streaming writes to metadata table is enabled for SPARK engine 
for incremental operations and disabled for all other cases.");
 
+  public static final ConfigProperty<String> METADATA_WRITE_CONCURRENCY_MODE = 
ConfigProperty

Review Comment:
   🤖 The config key uses `hoodie.metadata.write.concurrency.mode` (with a dot 
after `metadata`), while the `METADATA_PREFIX` is typically `hoodie.metadata`. 
Could you double-check the key construction — is the `.` after 
`METADATA_PREFIX` intentional? If `METADATA_PREFIX` already ends without a dot 
and you're concatenating `.write.concurrency.mode`, this would produce 
`hoodie.metadata.write.concurrency.mode` which looks correct. Just want to make 
sure the prefix convention is consistent with other keys in this file.
   
   <sub><i>- Generated by an AI agent and may contain mistakes. Please verify 
any suggestions before applying.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java:
##########
@@ -249,6 +266,65 @@ public static HoodieLockConfig.Builder newBuilder() {
     return new HoodieLockConfig.Builder();
   }
 
+  /**
+   * Build a {@link HoodieLockConfig} by copying lock-related configs from the 
given write config
+   * based on the configured lock provider. Only built-in lock providers are 
supported.
+   *
+   * @param lockProviderClass the lock provider class name
+   * @param writeConfig       the write config to copy lock properties from
+   * @return a new HoodieLockConfig with the relevant lock properties
+   * @throws HoodieException if the lock provider is not a built-in provider
+   */
+  public static HoodieLockConfig getLockConfigForBuiltInLockProvider(String 
lockProviderClass, HoodieWriteConfig writeConfig) {
+    TypedProperties dataProps = writeConfig.getProps();
+    Properties lockProps = new Properties();
+    lockProps.put(LOCK_PROVIDER_CLASS_NAME.key(), lockProviderClass);
+
+    // Common lock configs used by all providers
+    lockProps.put(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS.key(),
+        
writeConfig.getStringOrDefault(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS));
+    lockProps.put(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS.key(),
+        
writeConfig.getStringOrDefault(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS));
+    lockProps.put(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.key(),
+        
writeConfig.getStringOrDefault(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS));
+    lockProps.put(LOCK_ACQUIRE_NUM_RETRIES.key(),
+        writeConfig.getStringOrDefault(LOCK_ACQUIRE_NUM_RETRIES));
+    lockProps.put(LOCK_ACQUIRE_CLIENT_NUM_RETRIES.key(),
+        writeConfig.getStringOrDefault(LOCK_ACQUIRE_CLIENT_NUM_RETRIES));
+    lockProps.put(LOCK_ACQUIRE_WAIT_TIMEOUT_MS.key(),
+        
String.valueOf(writeConfig.getIntOrDefault(LOCK_ACQUIRE_WAIT_TIMEOUT_MS)));
+    lockProps.put(LOCK_HEARTBEAT_INTERVAL_MS.key(),
+        
String.valueOf(writeConfig.getIntOrDefault(LOCK_HEARTBEAT_INTERVAL_MS)));
+
+    // Provider-specific configs
+    if 
(FileSystemBasedLockProvider.class.getCanonicalName().equals(lockProviderClass))
 {
+      copyPropsWithPrefix(dataProps, lockProps, 
LockConfiguration.FILESYSTEM_BASED_LOCK_PROPERTY_PREFIX);
+    } else if 
(ZookeeperBasedLockProvider.class.getCanonicalName().equals(lockProviderClass)

Review Comment:
   🤖 When `ZookeeperBasedImplicitBasePathLockProvider` is used, it derives its 
ZK base path from `HoodieCommonConfig.BASE_PATH` and `HoodieTableConfig.NAME` 
at runtime (not from any `hoodie.write.lock.zookeeper.*` property). Since the 
MDT write config's base path is set to the metadata table path (e.g. 
`<data_path>/.hoodie/metadata`), the MDT compaction writer will compute a 
*different* ZK lock path than the data table writer — so the two will never 
contend for the same lock, defeating the purpose.
   
   Could you either (a) reject `ZookeeperBasedImplicitBasePathLockProvider` in 
this method (similar to the `InProcessLockProvider` check), or (b) explicitly 
inject the data table's base path/table name into the lock properties so the 
MDT writer resolves to the same ZK path?
   
   <sub><i>- Generated by an AI agent and may contain mistakes. Please verify 
any suggestions before applying.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java:
##########
@@ -249,6 +266,65 @@ public static HoodieLockConfig.Builder newBuilder() {
     return new HoodieLockConfig.Builder();
   }
 
+  /**
+   * Build a {@link HoodieLockConfig} by copying lock-related configs from the 
given write config
+   * based on the configured lock provider. Only built-in lock providers are 
supported.
+   *
+   * @param lockProviderClass the lock provider class name
+   * @param writeConfig       the write config to copy lock properties from
+   * @return a new HoodieLockConfig with the relevant lock properties
+   * @throws HoodieException if the lock provider is not a built-in provider
+   */
+  public static HoodieLockConfig getLockConfigForBuiltInLockProvider(String 
lockProviderClass, HoodieWriteConfig writeConfig) {
+    TypedProperties dataProps = writeConfig.getProps();
+    Properties lockProps = new Properties();
+    lockProps.put(LOCK_PROVIDER_CLASS_NAME.key(), lockProviderClass);
+
+    // Common lock configs used by all providers
+    lockProps.put(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS.key(),
+        
writeConfig.getStringOrDefault(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS));
+    lockProps.put(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS.key(),
+        
writeConfig.getStringOrDefault(LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS));
+    lockProps.put(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.key(),
+        
writeConfig.getStringOrDefault(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS));
+    lockProps.put(LOCK_ACQUIRE_NUM_RETRIES.key(),
+        writeConfig.getStringOrDefault(LOCK_ACQUIRE_NUM_RETRIES));
+    lockProps.put(LOCK_ACQUIRE_CLIENT_NUM_RETRIES.key(),
+        writeConfig.getStringOrDefault(LOCK_ACQUIRE_CLIENT_NUM_RETRIES));
+    lockProps.put(LOCK_ACQUIRE_WAIT_TIMEOUT_MS.key(),
+        
String.valueOf(writeConfig.getIntOrDefault(LOCK_ACQUIRE_WAIT_TIMEOUT_MS)));
+    lockProps.put(LOCK_HEARTBEAT_INTERVAL_MS.key(),
+        
String.valueOf(writeConfig.getIntOrDefault(LOCK_HEARTBEAT_INTERVAL_MS)));
+
+    // Provider-specific configs
+    if 
(FileSystemBasedLockProvider.class.getCanonicalName().equals(lockProviderClass))
 {
+      copyPropsWithPrefix(dataProps, lockProps, 
LockConfiguration.FILESYSTEM_BASED_LOCK_PROPERTY_PREFIX);
+    } else if 
(ZookeeperBasedLockProvider.class.getCanonicalName().equals(lockProviderClass)
+        || 
ZookeeperBasedImplicitBasePathLockProvider.class.getCanonicalName().equals(lockProviderClass))
 {
+      copyPropsWithPrefix(dataProps, lockProps, 
LockConfiguration.ZOOKEEPER_BASED_LOCK_PROPERTY_PREFIX);
+    } else if 
(HIVE_METASTORE_BASED_LOCK_PROVIDER_CLASS.equals(lockProviderClass)) {
+      copyPropsWithPrefix(dataProps, lockProps, 
LockConfiguration.HIVE_METASTORE_LOCK_PROPERTY_PREFIX);
+    } else if (DYNAMODB_BASED_LOCK_PROVIDER_CLASS.equals(lockProviderClass)
+        || 
DYNAMODB_BASED_IMPLICIT_PARTITION_KEY_LOCK_PROVIDER_CLASS.equals(lockProviderClass))
 {

Review Comment:
   🤖 Similar concern for `FileSystemBasedLockProvider` — if the user doesn't 
explicitly set `hoodie.write.lock.filesystem.path`, the provider falls back to 
`<base_path>/.hoodie/lock`. Since the MDT write config has a different base 
path, the fallback lock file location would differ from the data table's. The 
`copyPropsWithPrefix` would catch an explicitly set path, but the fallback case 
silently breaks lock sharing. It might be worth adding a validation that 
`FILESYSTEM_LOCK_PATH` is explicitly set when this provider is used for MDT 
multi-writer.
   
   <sub><i>- Generated by an AI agent and may contain mistakes. Please verify 
any suggestions before applying.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java:
##########
@@ -362,6 +387,99 @@ public static HoodieWriteConfig createMetadataWriteConfig(
     return metadataWriteConfig;
   }
 
+  /**
+   * Build the lock config for the metadata table by extracting lock-specific 
properties from the
+   * data table's write config. This avoids copying all properties (which 
would overwrite MDT-specific
+   * settings like base path and auto-clean).
+   */
+  private static HoodieLockConfig buildMetadataLockConfig(HoodieWriteConfig 
writeConfig) {
+    TypedProperties props = writeConfig.getProps();
+    HoodieLockConfig.Builder lockConfigBuilder = HoodieLockConfig.newBuilder()
+        .withClientNumRetries(Integer.parseInt(props.getString(
+            HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.key(),
+            HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue())))
+        .withClientRetryWaitTimeInMillis(Long.parseLong(props.getString(
+            
HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.key(),
+            
HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue())))
+        .withLockWaitTimeInMillis(Long.valueOf(props.getString(
+            HoodieLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS.key(),
+            
String.valueOf(HoodieLockConfig.LOCK_ACQUIRE_WAIT_TIMEOUT_MS.defaultValue()))))
+        .withNumRetries(Integer.parseInt(props.getString(
+            HoodieLockConfig.LOCK_ACQUIRE_NUM_RETRIES.key(),
+            HoodieLockConfig.LOCK_ACQUIRE_NUM_RETRIES.defaultValue())))
+        .withRetryWaitTimeInMillis(Long.parseLong(props.getString(
+            HoodieLockConfig.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS.key(),
+            
HoodieLockConfig.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS.defaultValue())))
+        .withRetryMaxWaitTimeInMillis(Long.parseLong(props.getString(
+            HoodieLockConfig.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS.key(),
+            
HoodieLockConfig.LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS.defaultValue())))
+        .withHeartbeatIntervalInMillis(Long.valueOf(props.getString(
+            HoodieLockConfig.LOCK_HEARTBEAT_INTERVAL_MS.key(),
+            
String.valueOf(HoodieLockConfig.LOCK_HEARTBEAT_INTERVAL_MS.defaultValue()))));
+
+    String lockProviderClass = writeConfig.getLockProviderClass();
+    if (lockProviderClass == null) {

Review Comment:
   🤖 Agreed — if multi-writer mode is explicitly configured on the MDT, the 
data table *must* have a lock provider. Silently falling back to no-lock would 
mask a misconfiguration that could lead to data corruption. Throwing here (or 
at least a `checkState`) would be the right call.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to