chia7712 commented on code in PR #15840:
URL: https://github.com/apache/kafka/pull/15840#discussion_r1606088555
##########
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##########
@@ -65,219 +74,479 @@
@ExtendWith(value = ClusterTestExtensions.class)
@Tag("integration")
public class ConfigCommandIntegrationTest {
- AdminZkClient adminZkClient;
- List<String> alterOpts;
+ private List<String> alterOpts;
+ private final String defaultBrokerId = "0";
private final ClusterInstance cluster;
+ private static Runnable run(Stream<String> command) {
+ return () -> {
+ try {
+ ConfigCommand.main(command.toArray(String[]::new));
+ } catch (RuntimeException e) {
+ // do nothing.
+ } finally {
+ Exit.resetExitProcedure();
+ }
+ };
+ }
+
public ConfigCommandIntegrationTest(ClusterInstance cluster) {
this.cluster = cluster;
}
- @ClusterTest(types = {Type.ZK, Type.KRAFT})
+ @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT})
public void testExitWithNonZeroStatusOnUpdatingUnallowedConfig() {
assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
"--entity-name", cluster.isKRaftTest() ? "0" : "1",
"--entity-type", "brokers",
"--alter",
"--add-config", "security.inter.broker.protocol=PLAINTEXT")),
- errOut ->
- assertTrue(errOut.contains("Cannot update these configs
dynamically: Set(security.inter.broker.protocol)"), errOut));
+ errOut -> assertTrue(errOut.contains("Cannot update these configs
dynamically: Set(security.inter.broker.protocol)"), errOut));
}
-
@ClusterTest(types = {Type.ZK})
public void testExitWithNonZeroStatusOnZkCommandAlterUserQuota() {
assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of(
"--entity-type", "users",
"--entity-name", "admin",
"--alter", "--add-config", "consumer_byte_rate=20000")),
- errOut ->
- assertTrue(errOut.contains("User configuration updates using
ZooKeeper are only supported for SCRAM credential updates."), errOut));
- }
-
- public static void assertNonZeroStatusExit(Stream<String> args,
Consumer<String> checkErrOut) {
- AtomicReference<Integer> exitStatus = new AtomicReference<>();
- Exit.setExitProcedure((status, __) -> {
- exitStatus.set(status);
- throw new RuntimeException();
- });
-
- String errOut = captureStandardErr(() -> {
- try {
- ConfigCommand.main(args.toArray(String[]::new));
- } catch (RuntimeException e) {
- // do nothing.
- } finally {
- Exit.resetExitProcedure();
- }
- });
-
- checkErrOut.accept(errOut);
- assertNotNull(exitStatus.get());
- assertEquals(1, exitStatus.get());
- }
-
- private Stream<String> quorumArgs() {
- return cluster.isKRaftTest()
- ? Stream.of("--bootstrap-server", cluster.bootstrapServers())
- : Stream.of("--zookeeper",
((ZkClusterInvocationContext.ZkClusterInstance)
cluster).getUnderlying().zkConnect());
- }
-
- public List<String> entityOp(Optional<String> brokerId) {
- return brokerId.map(id -> Arrays.asList("--entity-name",
id)).orElse(Collections.singletonList("--entity-default"));
- }
-
- public void alterConfigWithZk(KafkaZkClient zkClient, Map<String, String>
configs, Optional<String> brokerId) throws Exception {
- alterConfigWithZk(zkClient, configs, brokerId, Collections.emptyMap());
+ errOut -> assertTrue(errOut.contains("User configuration updates
using ZooKeeper are only supported for SCRAM credential updates."), errOut));
}
- public void alterConfigWithZk(KafkaZkClient zkClient, Map<String, String>
configs, Optional<String> brokerId, Map<String, String> encoderConfigs) {
- String configStr = Stream.of(configs.entrySet(),
encoderConfigs.entrySet())
- .flatMap(Set::stream)
- .map(e -> e.getKey() + "=" + e.getValue())
- .collect(Collectors.joining(","));
- ConfigCommand.ConfigCommandOptions addOpts = new
ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId),
Arrays.asList("--add-config", configStr)));
- ConfigCommand.alterConfigWithZk(zkClient, addOpts, adminZkClient);
- }
-
- void verifyConfig(KafkaZkClient zkClient, Map<String, String> configs,
Optional<String> brokerId) {
- Properties entityConfigs = zkClient.getEntityConfigs("brokers",
brokerId.orElse(ZooKeeperInternals.DEFAULT_STRING));
- assertEquals(configs, entityConfigs);
- }
-
- void alterAndVerifyConfig(KafkaZkClient zkClient, Map<String, String>
configs, Optional<String> brokerId) throws Exception {
- alterConfigWithZk(zkClient, configs, brokerId);
- verifyConfig(zkClient, configs, brokerId);
- }
+ @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+ public void testNullStatusOnKraftCommandAlterUserQuota() {
+ Stream<String> command = Stream.concat(quorumArgs(), Stream.of(
+ "--entity-type", "users",
+ "--entity-name", "admin",
+ "--alter", "--add-config", "consumer_byte_rate=20000"));
+ String message = captureStandardMsg(run(command));
- void deleteAndVerifyConfig(KafkaZkClient zkClient, Set<String>
configNames, Optional<String> brokerId) {
- ConfigCommand.ConfigCommandOptions deleteOpts = new
ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId),
Arrays.asList("--delete-config", String.join(",", configNames))));
- ConfigCommand.alterConfigWithZk(zkClient, deleteOpts, adminZkClient);
- verifyConfig(zkClient, Collections.emptyMap(), brokerId);
+ assertTrue(StringUtils.isBlank(message), message);
}
- @ClusterTest(types = {Type.ZK})
+ @ClusterTest(types = Type.ZK)
public void testDynamicBrokerConfigUpdateUsingZooKeeper() throws Exception
{
cluster.shutdownBroker(0);
String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance)
cluster).getUnderlying().zkConnect();
KafkaZkClient zkClient =
((ZkClusterInvocationContext.ZkClusterInstance)
cluster).getUnderlying().zkClient();
String brokerId = "1";
- adminZkClient = new AdminZkClient(zkClient, scala.None$.empty());
- alterOpts = Arrays.asList("--zookeeper", zkConnect, "--entity-type",
"brokers", "--alter");
+ AdminZkClient adminZkClient = new AdminZkClient(zkClient,
scala.None$.empty());
+ alterOpts = asList("--zookeeper", zkConnect, "--entity-type",
"brokers", "--alter");
// Add config
- alterAndVerifyConfig(zkClient,
Collections.singletonMap("message.max.size", "110000"), Optional.of(brokerId));
- alterAndVerifyConfig(zkClient,
Collections.singletonMap("message.max.size", "120000"), Optional.empty());
+ alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
+ singletonMap("message.max.size", "110000"));
+ alterAndVerifyConfig(zkClient, adminZkClient, Optional.empty(),
+ singletonMap("message.max.size", "120000"));
// Change config
- alterAndVerifyConfig(zkClient,
Collections.singletonMap("message.max.size", "130000"), Optional.of(brokerId));
- alterAndVerifyConfig(zkClient,
Collections.singletonMap("message.max.size", "140000"), Optional.empty());
+ alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
+ singletonMap("message.max.size", "130000"));
+ alterAndVerifyConfig(zkClient, adminZkClient, Optional.empty(),
+ singletonMap("message.max.size", "140000"));
// Delete config
- deleteAndVerifyConfig(zkClient,
Collections.singleton("message.max.size"), Optional.of(brokerId));
- deleteAndVerifyConfig(zkClient,
Collections.singleton("message.max.size"), Optional.empty());
+ deleteAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
+ singleton("message.max.size"));
+ deleteAndVerifyConfig(zkClient, adminZkClient, Optional.empty(),
+ singleton("message.max.size"));
// Listener configs: should work only with listener name
- alterAndVerifyConfig(zkClient,
Collections.singletonMap("listener.name.external.ssl.keystore.location",
"/tmp/test.jks"), Optional.of(brokerId));
+ alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
+ singletonMap("listener.name.external.ssl.keystore.location",
"/tmp/test.jks"));
assertThrows(ConfigException.class,
- () -> alterConfigWithZk(zkClient,
Collections.singletonMap("ssl.keystore.location", "/tmp/test.jks"),
Optional.of(brokerId)));
+ () -> alterConfigWithZk(zkClient, adminZkClient,
Optional.of(brokerId),
+ singletonMap("ssl.keystore.location",
"/tmp/test.jks")));
// Per-broker config configured at default cluster-level should fail
assertThrows(ConfigException.class,
- () -> alterConfigWithZk(zkClient,
Collections.singletonMap("listener.name.external.ssl.keystore.location",
"/tmp/test.jks"), Optional.empty()));
- deleteAndVerifyConfig(zkClient,
Collections.singleton("listener.name.external.ssl.keystore.location"),
Optional.of(brokerId));
+ () -> alterConfigWithZk(zkClient, adminZkClient,
Optional.empty(),
+
singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks")));
+ deleteAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
+ singleton("listener.name.external.ssl.keystore.location"));
// Password config update without encoder secret should fail
assertThrows(IllegalArgumentException.class,
- () -> alterConfigWithZk(zkClient,
Collections.singletonMap("listener.name.external.ssl.keystore.password",
"secret"), Optional.of(brokerId)));
+ () -> alterConfigWithZk(zkClient, adminZkClient,
Optional.of(brokerId),
+
singletonMap("listener.name.external.ssl.keystore.password", "secret")));
// Password config update with encoder secret should succeed and
encoded password must be stored in ZK
Map<String, String> configs = new HashMap<>();
configs.put("listener.name.external.ssl.keystore.password", "secret");
configs.put("log.cleaner.threads", "2");
- Map<String, String> encoderConfigs =
Collections.singletonMap(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG,
"encoder-secret");
- alterConfigWithZk(zkClient, configs, Optional.of(brokerId),
encoderConfigs);
+ Map<String, String> encoderConfigs = new HashMap<>(configs);
+ encoderConfigs.put(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret");
+ alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId),
encoderConfigs);
Properties brokerConfigs = zkClient.getEntityConfigs("brokers",
brokerId);
-
assertFalse(brokerConfigs.contains(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG),
"Encoder secret stored in ZooKeeper");
+ assertFalse(brokerConfigs.contains(PASSWORD_ENCODER_SECRET_CONFIG),
"Encoder secret stored in ZooKeeper");
assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads"));
// not encoded
String encodedPassword =
brokerConfigs.getProperty("listener.name.external.ssl.keystore.password");
PasswordEncoder passwordEncoder =
ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs));
assertEquals("secret",
passwordEncoder.decode(encodedPassword).value());
assertEquals(configs.size(), brokerConfigs.size());
// Password config update with overrides for encoder parameters
- Map<String, String> configs2 =
Collections.singletonMap("listener.name.internal.ssl.keystore.password",
"secret2");
- Map<String, String> encoderConfigs2 = new HashMap<>();
-
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG,
"encoder-secret");
-
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG,
"DES/CBC/PKCS5Padding");
-
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG,
"1024");
-
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG,
"PBKDF2WithHmacSHA1");
-
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG,
"64");
- alterConfigWithZk(zkClient, configs2, Optional.of(brokerId),
encoderConfigs2);
+ Map<String, String> encoderConfigs2 = generateEncodeConfig();
+ alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId),
encoderConfigs2);
Properties brokerConfigs2 = zkClient.getEntityConfigs("brokers",
brokerId);
String encodedPassword2 =
brokerConfigs2.getProperty("listener.name.internal.ssl.keystore.password");
- assertEquals("secret2",
ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs)).decode(encodedPassword2).value());
- assertEquals("secret2",
ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs2)).decode(encodedPassword2).value());
+ assertEquals("secret2", ConfigCommand.createPasswordEncoder(
+
JavaConverters.mapAsScalaMap(encoderConfigs)).decode(encodedPassword2).value());
+ assertEquals("secret2", ConfigCommand.createPasswordEncoder(
+
JavaConverters.mapAsScalaMap(encoderConfigs2)).decode(encodedPassword2).value());
// Password config update at default cluster-level should fail
- assertThrows(ConfigException.class, () -> alterConfigWithZk(zkClient,
configs, Optional.empty(), encoderConfigs));
+ assertThrows(ConfigException.class,
+ () -> alterConfigWithZk(zkClient, adminZkClient,
Optional.empty(), encoderConfigs));
// Dynamic config updates using ZK should fail if broker is running.
registerBrokerInZk(zkClient, Integer.parseInt(brokerId));
- assertThrows(IllegalArgumentException.class, () ->
alterConfigWithZk(zkClient, Collections.singletonMap("message.max.size",
"210000"), Optional.of(brokerId)));
- assertThrows(IllegalArgumentException.class, () ->
alterConfigWithZk(zkClient, Collections.singletonMap("message.max.size",
"220000"), Optional.empty()));
+ assertThrows(IllegalArgumentException.class,
+ () -> alterConfigWithZk(zkClient, adminZkClient,
+ Optional.of(brokerId),
singletonMap("message.max.size", "210000")));
+ assertThrows(IllegalArgumentException.class,
+ () -> alterConfigWithZk(zkClient, adminZkClient,
+ Optional.empty(), singletonMap("message.max.size",
"220000")));
// Dynamic config updates using ZK should for a different broker that
is not running should succeed
- alterAndVerifyConfig(zkClient,
Collections.singletonMap("message.max.size", "230000"), Optional.of("2"));
+ alterAndVerifyConfig(zkClient, adminZkClient, Optional.of("2"),
singletonMap("message.max.size", "230000"));
+ }
+
+ @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+ public void testDynamicBrokerConfigUpdateUsingKraft() throws Exception {
+ alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
+
+ try (Admin client = cluster.createAdminClient()) {
+ // Add config
+ alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
singletonMap("message.max.size", "110000"));
+ alterAndVerifyConfig(client, Optional.empty(),
singletonMap("message.max.size", "120000"));
+
+ // Change config
+ alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
singletonMap("message.max.size", "130000"));
+ alterAndVerifyConfig(client, Optional.empty(),
singletonMap("message.max.size", "140000"));
+
+ // Delete config
+ deleteAndVerifyConfig(client, Optional.of(defaultBrokerId),
singleton("message.max.size"));
+
+ // Listener configs: should work only with listener name
+ alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
+
singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"));
+ alterConfigWithKraft(client, Optional.empty(),
+
singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"));
+ deleteAndVerifyConfig(client, Optional.of(defaultBrokerId),
+ singleton("listener.name.external.ssl.keystore.location"));
+ alterConfigWithKraft(client, Optional.of(defaultBrokerId),
+
singletonMap("listener.name.external.ssl.keystore.password", "secret"));
+
+ // Password config update with encoder secret should succeed and
encoded password must be stored in ZK
+ Map<String, String> configs = new HashMap<>();
+ configs.put("listener.name.external.ssl.keystore.password",
"secret");
+ configs.put("log.cleaner.threads", "2");
+ // Password encoder configs
+ configs.put(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret");
+
+ // Password config update at default cluster-level should fail
+ assertThrows(ExecutionException.class,
+ () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId), configs));
+ }
+ }
+
+ @ClusterTest(types = {Type.ZK})
+ public void testAlterReadOnlyConfigInZookeeperThenShouldFail() {
+ cluster.shutdownBroker(0);
+ String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance)
cluster).getUnderlying().zkConnect();
+ KafkaZkClient zkClient =
((ZkClusterInvocationContext.ZkClusterInstance)
cluster).getUnderlying().zkClient();
+ AdminZkClient adminZkClient = new AdminZkClient(zkClient,
scala.None$.empty());
+ alterOpts = generateDefaultAlterOpts(zkConnect);
+
+ assertThrows(ConfigException.class,
+ () -> alterConfigWithZk(zkClient, adminZkClient,
Optional.of(defaultBrokerId),
+ singletonMap("auto.create.topics.enable", "false")));
+ assertThrows(ConfigException.class,
+ () -> alterConfigWithZk(zkClient, adminZkClient,
Optional.of(defaultBrokerId),
+ singletonMap("auto.leader.rebalance.enable",
"false")));
+ assertThrows(ConfigException.class,
+ () -> alterConfigWithZk(zkClient, adminZkClient,
Optional.of(defaultBrokerId),
+ singletonMap("broker.id", "1")));
+ }
+
+ @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+ public void testAlterReadOnlyConfigInKRaftThenShouldFail() {
+ alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
+
+ try (Admin client = cluster.createAdminClient()) {
+ assertThrows(ExecutionException.class,
+ () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId),
+ singletonMap("auto.create.topics.enable",
"false")));
+ assertThrows(ExecutionException.class,
+ () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId),
+ singletonMap("auto.leader.rebalance.enable",
"false")));
+ assertThrows(ExecutionException.class,
+ () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId),
+ singletonMap("broker.id", "1")));
+ }
+ }
+
+ @ClusterTest(types = {Type.ZK})
+ public void testUpdateClusterWideConfigInZookeeperThenShouldSuccessful() {
+ cluster.shutdownBroker(0);
+ String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance)
cluster).getUnderlying().zkConnect();
+ KafkaZkClient zkClient =
((ZkClusterInvocationContext.ZkClusterInstance)
cluster).getUnderlying().zkClient();
+ AdminZkClient adminZkClient = new AdminZkClient(zkClient,
scala.None$.empty());
+ alterOpts = generateDefaultAlterOpts(zkConnect);
+
+ Map<String, String> configs = new HashMap<>();
+ configs.put("log.flush.interval.messages", "100");
+ configs.put("log.retention.bytes", "20");
+ configs.put("log.retention.ms", "2");
+
+ alterAndVerifyConfig(zkClient, adminZkClient,
Optional.of(defaultBrokerId), configs);
+ }
+
+ @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+ public void testUpdateClusterWideConfigInKRaftThenShouldSuccessful()
throws Exception {
+ alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
+
+ try (Admin client = cluster.createAdminClient()) {
+ alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
+ singletonMap("log.flush.interval.messages", "100"));
+ alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
+ singletonMap("log.retention.bytes", "20"));
+ alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
+ singletonMap("log.retention.ms", "2"));
+ }
+ }
+
+ @ClusterTest(types = {Type.ZK})
+ public void
testUpdatePerBrokerConfigWithListenerNameInZookeeperThenShouldSuccessful() {
+ cluster.shutdownBroker(0);
+ String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance)
cluster).getUnderlying().zkConnect();
+ KafkaZkClient zkClient =
((ZkClusterInvocationContext.ZkClusterInstance)
cluster).getUnderlying().zkClient();
+ AdminZkClient adminZkClient = new AdminZkClient(zkClient,
scala.None$.empty());
+ alterOpts = generateDefaultAlterOpts(zkConnect);
+
+ String listenerName = "listener.name.internal.";
+ String sslTruststoreType = listenerName + "ssl.truststore.type";
+ String sslTruststoreLocation = listenerName +
"ssl.truststore.location";
+ String sslTruststorePassword = listenerName +
"ssl.truststore.password";
+
+ Map<String, String> configs = new HashMap<>();
+ configs.put(sslTruststoreType, "PKCS12");
+ configs.put(sslTruststoreLocation, "/temp/test.jks");
+ configs.put("password.encoder.secret", "encoder-secret");
+ configs.put(sslTruststorePassword, "password");
+
+ alterConfigWithZk(zkClient, adminZkClient,
Optional.of(defaultBrokerId), configs);
+
+ Properties properties = zkClient.getEntityConfigs("brokers",
defaultBrokerId);
+ assertTrue(properties.containsKey(sslTruststorePassword));
+ assertEquals(configs.get(sslTruststoreType),
properties.getProperty(sslTruststoreType));
+ assertEquals(configs.get(sslTruststoreLocation),
properties.getProperty(sslTruststoreLocation));
+ }
+
+ @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+ public void
testUpdatePerBrokerConfigWithListenerNameInKRaftThenShouldSuccessful() throws
Exception {
+ alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
+ String listenerName = "listener.name.internal.";
+
+ try (Admin client = cluster.createAdminClient()) {
+ alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
+ singletonMap(listenerName + "ssl.truststore.type",
"PKCS12"));
+ alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
+ singletonMap(listenerName + "ssl.truststore.location",
"/temp/test.jks"));
+ alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
+ singletonMap(listenerName + "ssl.truststore.password",
"password"));
+ }
+ }
+
+ @ClusterTest(types = {Type.ZK})
+ public void testUpdatePerBrokerConfigInZookeeperThenShouldFail() {
+ cluster.shutdownBroker(0);
+ String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance)
cluster).getUnderlying().zkConnect();
+ KafkaZkClient zkClient =
((ZkClusterInvocationContext.ZkClusterInstance)
cluster).getUnderlying().zkClient();
+ AdminZkClient adminZkClient = new AdminZkClient(zkClient,
scala.None$.empty());
+ alterOpts = generateDefaultAlterOpts(zkConnect);
+
+ assertThrows(ConfigException.class, () ->
+ alterAndVerifyConfig(zkClient, adminZkClient,
Optional.of(defaultBrokerId),
+ singletonMap("ssl.truststore.type", "PKCS12")));
+ assertThrows(ConfigException.class, () ->
+ alterAndVerifyConfig(zkClient, adminZkClient,
Optional.of(defaultBrokerId),
+ singletonMap("ssl.truststore.location",
"/temp/test.jks")));
+ assertThrows(ConfigException.class, () ->
+ alterAndVerifyConfig(zkClient, adminZkClient,
Optional.of(defaultBrokerId),
+ singletonMap("ssl.truststore.password", "password")));
+ }
+
+ @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT})
+ public void testUpdatePerBrokerConfigInKRaftThenShouldFail() {
+ alterOpts = generateDefaultAlterOpts(cluster.bootstrapServers());
+
+ try (Admin client = cluster.createAdminClient()) {
+ assertThrows(ExecutionException.class,
+ () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId),
+ singletonMap("ssl.truststore.type", "PKCS12")));
+ assertThrows(ExecutionException.class,
+ () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId),
+ singletonMap("ssl.truststore.location",
"/temp/test.jks")));
+ assertThrows(ExecutionException.class,
+ () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId),
+ singletonMap("ssl.truststore.password",
"password")));
+ }
+ }
+
+ private void assertNonZeroStatusExit(Stream<String> args, Consumer<String>
checkErrOut) {
+ AtomicReference<Integer> exitStatus = new AtomicReference<>();
+ Exit.setExitProcedure((status, __) -> {
+ exitStatus.set(status);
+ throw new RuntimeException();
+ });
+
+ String errOut = captureStandardMsg(run(args));
+
+ checkErrOut.accept(errOut);
+ assertNotNull(exitStatus.get());
+ assertEquals(1, exitStatus.get());
+ }
+
+ private Stream<String> quorumArgs() {
+ return cluster.isKRaftTest()
+ ? Stream.of("--bootstrap-server", cluster.bootstrapServers())
+ : Stream.of("--zookeeper",
((ZkClusterInvocationContext.ZkClusterInstance)
cluster).getUnderlying().zkConnect());
+ }
+
+ private void verifyConfig(KafkaZkClient zkClient, Optional<String>
brokerId, Map<String, String> config) {
+ Properties entityConfigs = zkClient.getEntityConfigs("brokers",
+ brokerId.orElse(ZooKeeperInternals.DEFAULT_STRING));
+ assertEquals(config, entityConfigs);
+ }
+
+ private void alterAndVerifyConfig(KafkaZkClient zkClient, AdminZkClient
adminZkClient,
+ Optional<String> brokerId, Map<String,
String> configs) {
+ alterConfigWithZk(zkClient, adminZkClient, brokerId, configs);
+ verifyConfig(zkClient, brokerId, configs);
+ }
+
+ private void alterConfigWithZk(KafkaZkClient zkClient, AdminZkClient
adminZkClient,
+ Optional<String> brokerId, Map<String,
String> config) {
+ String configStr = transferConfigMapToString(config);
+ ConfigCommand.ConfigCommandOptions addOpts =
+ new ConfigCommand.ConfigCommandOptions(toArray(alterOpts,
entityOp(brokerId), asList("--add-config", configStr)));
+ ConfigCommand.alterConfigWithZk(zkClient, addOpts, adminZkClient);
+ }
+
+ private List<String> entityOp(Optional<String> brokerId) {
+ return brokerId.map(id -> asList("--entity-name", id))
+ .orElse(singletonList("--entity-default"));
+ }
+
+ private void deleteAndVerifyConfig(KafkaZkClient zkClient, AdminZkClient
adminZkClient,
+ Optional<String> brokerId, Set<String>
configNames) {
+ ConfigCommand.ConfigCommandOptions deleteOpts =
+ new ConfigCommand.ConfigCommandOptions(toArray(alterOpts,
entityOp(brokerId),
+ asList("--delete-config", String.join(",",
configNames))));
+ ConfigCommand.alterConfigWithZk(zkClient, deleteOpts, adminZkClient);
+ verifyConfig(zkClient, brokerId, Collections.emptyMap());
+ }
+
+ private Map<String, String> generateEncodeConfig() {
+ Map<String, String> map = new HashMap<>();
+ map.put(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret");
+ map.put(PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG,
"DES/CBC/PKCS5Padding");
+ map.put(PASSWORD_ENCODER_ITERATIONS_CONFIG, "1024");
+ map.put(PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG,
"PBKDF2WithHmacSHA1");
+ map.put(PASSWORD_ENCODER_KEY_LENGTH_CONFIG, "64");
+ map.put("listener.name.internal.ssl.keystore.password", "secret2");
+ return map;
}
private void registerBrokerInZk(KafkaZkClient zkClient, int id) {
zkClient.createTopLevelPaths();
SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
- EndPoint endpoint = new EndPoint("localhost", 9092,
ListenerName.forSecurityProtocol(securityProtocol), securityProtocol);
- BrokerInfo brokerInfo = BrokerInfo.apply(Broker.apply(id,
seq(endpoint), scala.None$.empty()), MetadataVersion.latestTesting(), 9192);
+ EndPoint endpoint = new EndPoint("localhost", 9092,
+ ListenerName.forSecurityProtocol(securityProtocol),
securityProtocol);
+ BrokerInfo brokerInfo = BrokerInfo.apply(Broker.apply(id, endpoint,
+ scala.None$.empty()), MetadataVersion.latestTesting(), 9192);
zkClient.registerBroker(brokerInfo);
}
- @SafeVarargs
- static <T> Seq<T> seq(T...seq) {
- return seq(Arrays.asList(seq));
+ private List<String> generateDefaultAlterOpts(String bootstrapServers) {
+ return asList("--bootstrap-server", bootstrapServers,
+ "--entity-type", "brokers",
+ "--entity-name", "0", "--alter");
+ }
+
+ private void alterAndVerifyConfig(Admin client, Optional<String> brokerId,
Map<String, String> config) throws Exception {
+ alterConfigWithKraft(client, brokerId, config);
+ verifyConfig(client, brokerId, config);
+ }
+
+ private void alterConfigWithKraft(Admin client, Optional<String> brokerId,
Map<String, String> config) {
+ String configStr = transferConfigMapToString(config);
+ ConfigCommand.ConfigCommandOptions addOpts =
+ new ConfigCommand.ConfigCommandOptions(toArray(alterOpts,
entityOp(brokerId), asList("--add-config", configStr)));
+ ConfigCommand.alterConfig(client, addOpts);
}
- @SuppressWarnings({"deprecation"})
- static <T> Seq<T> seq(Collection<T> seq) {
- return
JavaConverters.asScalaIteratorConverter(seq.iterator()).asScala().toSeq();
+ private void verifyConfig(Admin client, Optional<String> brokerId,
Map<String, String> config) throws Exception {
+ ConfigResource configResource = new
ConfigResource(ConfigResource.Type.BROKER, brokerId.orElse(defaultBrokerId));
+ client.describeConfigs(singletonList(configResource))
+ .all()
+ .get()
+ .entrySet()
+ .stream()
+ .filter(entry -> entry.getKey().type() ==
ConfigResource.Type.BROKER)
+ .map(Map.Entry::getValue)
+ .flatMap(configs -> configs.entries().stream())
+ .map(entry -> entry.name() + "=" + entry.value())
+ .filter(key -> transferConfigMapToString(config).contains(key))
+ .forEach(key -> assertTrue(true, "Config key " + key + " is
not found in the broker configs"));
+
+ }
+
+ private void deleteAndVerifyConfig(Admin client, Optional<String>
brokerId, Set<String> config) throws Exception {
+ ConfigCommand.ConfigCommandOptions deleteOpts =
+ new ConfigCommand.ConfigCommandOptions(toArray(alterOpts,
entityOp(brokerId),
+ asList("--delete-config", String.join(",", config))));
+ ConfigCommand.alterConfig(client, deleteOpts);
+ verifyConfig(client, brokerId, Collections.emptyMap());
Review Comment:
Could we add a new method to verify the "deleted" config has default value
(the config `message.max.bytes` has default value)? for example:
```java
private void verifyConfig(Admin client, Optional<String> brokerId,
Set<String> config) throws Exception {
ConfigResource configResource = new
ConfigResource(ConfigResource.Type.BROKER, brokerId.orElse(defaultBrokerId));
TestUtils.waitForCondition(() -> {
Map<String, String> current =
client.describeConfigs(singletonList(configResource)).all().get().values()
.stream().flatMap(e -> e.entries().stream())
.collect(Collectors.toMap(ConfigEntry::name,
ConfigEntry::value));
return config.stream().allMatch(current::containsKey);
}, 5000, config + " are not updated");
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]