mimaison commented on code in PR #17424:
URL: https://github.com/apache/kafka/pull/17424#discussion_r1799399371
##########
core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala:
##########
@@ -39,7 +38,7 @@ import org.apache.kafka.common.resource.PatternType.{LITERAL,
PREFIXED}
import org.apache.kafka.common.security.auth._
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
-import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs,
ServerLogConfigs, ZkConfigs}
+import org.apache.kafka.server.config.{ServerConfigs, ReplicationConfigs,
ServerLogConfigs}
Review Comment:
Can we update the javadoc of this file and remove the ZooKeeper mention?
##########
core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala:
##########
@@ -107,7 +107,7 @@ class DelegationTokenEndToEndAuthorizationTest extends
EndToEndAuthorizationTest
}
@ParameterizedTest
- @ValueSource(strings = Array("kraft", "zk"))
+ @ValueSource(strings = Array("kraft"))
Review Comment:
Let's also update the `configureSecurityBeforeServersStart()` method and
remove the ZK logic
##########
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala:
##########
@@ -710,7 +710,7 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
}
@ParameterizedTest
- @ValueSource(strings = Array("zk", "kraft"))
+ @ValueSource(strings = Array("kraft"))
Review Comment:
Can we also update the `sendRequest()` method just above and remove the
ZK/KRaft logic?
##########
docs/security.html:
##########
@@ -1277,8 +1277,6 @@ <h3 class="anchor-heading"><a id="security_authz"
class="anchor-link"></a><a hre
Configured implementations must extend
<code>org.apache.kafka.server.authorizer.Authorizer</code>.
Kafka provides default implementations which store ACLs in the cluster
metadata (either Zookeeper or the KRaft metadata log).
- For Zookeeper-based clusters, the provided implementation is configured as
follows:
Review Comment:
Let's also remove the ZooKeeper mention on the line above since Kafka does
not provide a ZooKeeper implementation anymore.
##########
core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala:
##########
@@ -89,7 +89,7 @@ class PlaintextEndToEndAuthorizationTest extends
EndToEndAuthorizationTest {
}
@ParameterizedTest
- @ValueSource(strings = Array("kraft", "zk"))
+ @ValueSource(strings = Array("kraft"))
Review Comment:
In the `setUp()` method above we can change `ZkSasl` to `KafkaSasl`
##########
core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala:
##########
@@ -1,210 +0,0 @@
-/**
Review Comment:
Do we have another test that validates this with KRaft?
##########
core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala:
##########
@@ -10,11 +10,10 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- **/
+ * */
Review Comment:
Is this change intended?
##########
core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala:
##########
@@ -87,11 +87,7 @@ class RequestQuotaTest extends BaseRequestTest {
properties.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
"0")
properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
classOf[RequestQuotaTest.TestPrincipalBuilder].getName)
properties.put(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true")
- if (isKRaftTest()) {
Review Comment:
A bunch of other methods (`clientActions()`, `clusterActions()`, etc) still
have KRaft/ZooKeeper conditional code, can we remove that as well?
##########
core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala:
##########
@@ -142,16 +161,14 @@ class AuthorizerTest extends QuorumTestHarness with
BaseAuthorizerTest {
// Authorizing the empty resource is not supported because we create a znode
with the resource name.
@ParameterizedTest
- @ValueSource(strings = Array(KRAFT, ZK))
+ @ValueSource(strings = Array(KRAFT))
Review Comment:
Also update the comment above that mentions `znode`
##########
core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala:
##########
@@ -392,161 +409,17 @@ class AuthorizerTest extends QuorumTestHarness with
BaseAuthorizerTest {
//test remove all acls for resource
removeAcls(authorizer1, Set.empty, resource)
TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], authorizer1,
resource)
- if (quorum.equals(ZK)) {
- assertFalse(zkClient.resourceExists(resource))
- }
//test removing last acl also deletes ZooKeeper path
acls = changeAclAndVerify(Set.empty, Set(acl1), Set.empty)
changeAclAndVerify(acls, Set.empty, acls)
- if (quorum.equals(ZK)) {
Review Comment:
Can we update the comment above that still mentions ZooKeeper?
##########
core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala:
##########
@@ -1,169 +0,0 @@
-/**
Review Comment:
Why are we deleting this test? Is this covered by another test class?
##########
core/src/test/scala/unit/kafka/security/authorizer/BaseAuthorizerTest.scala:
##########
@@ -1,375 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.security.authorizer
-
-import java.net.InetAddress
-import java.util.UUID
-import kafka.server.KafkaConfig
-import kafka.zookeeper.ZooKeeperClient
-import org.apache.kafka.common.acl.AclOperation.{ALL, READ, WRITE}
-import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
-import org.apache.kafka.common.acl.{AccessControlEntry,
AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation}
-import org.apache.kafka.common.network.{ClientInformation, ListenerName}
-import org.apache.kafka.common.protocol.ApiKeys
-import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
-import org.apache.kafka.common.resource.PatternType.{LITERAL, PREFIXED}
-import org.apache.kafka.common.resource.ResourcePattern.WILDCARD_RESOURCE
-import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC,
TRANSACTIONAL_ID}
-import org.apache.kafka.common.resource.{ResourcePattern, ResourceType}
-import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
-import org.apache.kafka.security.authorizer.AclEntry
-import org.apache.kafka.security.authorizer.AclEntry.{WILDCARD_HOST,
WILDCARD_PRINCIPAL_STRING}
-import org.apache.kafka.server.authorizer.{AuthorizationResult, Authorizer}
-import org.junit.jupiter.api.Assertions.{assertFalse, assertTrue}
-import org.junit.jupiter.api.Test
-
-import scala.jdk.CollectionConverters._
-
-trait BaseAuthorizerTest {
-
- def authorizer: Authorizer
-
- val superUsers = "User:superuser1; User:superuser2"
- val username = "alice"
- val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val requestContext: RequestContext = newRequestContext(principal,
InetAddress.getByName("192.168.0.1"))
- val superUserName = "superuser1"
- var config: KafkaConfig = _
- var zooKeeperClient: ZooKeeperClient = _
- var resource: ResourcePattern = _
-
- @Test
Review Comment:
What about these tests? Are these tested somewhere else?
##########
core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala:
##########
@@ -786,291 +641,17 @@ class AuthorizerTest extends QuorumTestHarness with
BaseAuthorizerTest {
}
assertEquals(Set(acl3, acl4),
deleteResults(0).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
assertEquals(Set(acl1),
deleteResults(1).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
- if (quorum.equals(ZK)) {
- assertEquals(Set.empty,
deleteResults(2).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
- } else {
- // standard authorizer first finds the acls that match filters and then
delete them.
- // So filters[2] will match acl3 even though it is also matching
filters[0] and will be deleted by it
- assertEquals(Set(acl3),
deleteResults(2).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
- }
+ // standard authorizer first finds the acls that match filters and then
delete them.
+ // So filters[2] will match acl3 even though it is also matching
filters[0] and will be deleted by it
+ assertEquals(Set(acl3),
deleteResults(2).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
assertEquals(Set.empty,
deleteResults(3).aclBindingDeleteResults.asScala.map(_.aclBinding).toSet)
}
- @Test
- def testThrowsOnAddPrefixedAclIfInterBrokerProtocolVersionTooLow(): Unit = {
- givenAuthorizerWithProtocolVersion(Option(IBP_2_0_IV0))
- val e = assertThrows(classOf[ApiException],
- () => addAcls(authorizer1, Set(denyReadAcl), new ResourcePattern(TOPIC,
"z_other", PREFIXED)))
- assertTrue(e.getCause.isInstanceOf[UnsupportedVersionException],
s"Unexpected exception $e")
- }
-
- @Test
- def testCreateAclWithInvalidResourceName(): Unit = {
- assertThrows(classOf[ApiException],
- () => addAcls(authorizer1, Set(allowReadAcl), new ResourcePattern(TOPIC,
"test/1", LITERAL)))
- }
-
- @Test
- def testWritesExtendedAclChangeEventIfInterBrokerProtocolNotSet(): Unit = {
- givenAuthorizerWithProtocolVersion(Option.empty)
- val resource = new ResourcePattern(TOPIC, "z_other", PREFIXED)
- val expected = new String(ZkAclStore(PREFIXED).changeStore
- .createChangeNode(resource).bytes, UTF_8)
-
- addAcls(authorizer1, Set(denyReadAcl), resource)
-
- val actual = getAclChangeEventAsString(PREFIXED)
-
- assertEquals(expected, actual)
- }
-
- @Test
- def testWritesExtendedAclChangeEventWhenInterBrokerProtocolAtLeastKafkaV2():
Unit = {
- givenAuthorizerWithProtocolVersion(Option(IBP_2_0_IV1))
- val resource = new ResourcePattern(TOPIC, "z_other", PREFIXED)
- val expected = new String(ZkAclStore(PREFIXED).changeStore
- .createChangeNode(resource).bytes, UTF_8)
-
- addAcls(authorizer1, Set(denyReadAcl), resource)
-
- val actual = getAclChangeEventAsString(PREFIXED)
-
- assertEquals(expected, actual)
- }
-
- @Test
- def
testWritesLiteralWritesLiteralAclChangeEventWhenInterBrokerProtocolLessThanKafkaV2eralAclChangesForOlderProtocolVersions():
Unit = {
- givenAuthorizerWithProtocolVersion(Option(IBP_2_0_IV0))
- val resource = new ResourcePattern(TOPIC, "z_other", LITERAL)
- val expected = new String(ZkAclStore(LITERAL).changeStore
- .createChangeNode(resource).bytes, UTF_8)
-
- addAcls(authorizer1, Set(denyReadAcl), resource)
-
- val actual = getAclChangeEventAsString(LITERAL)
-
- assertEquals(expected, actual)
- }
-
- @Test
- def testWritesLiteralAclChangeEventWhenInterBrokerProtocolIsKafkaV2(): Unit
= {
- givenAuthorizerWithProtocolVersion(Option(IBP_2_0_IV1))
- val resource = new ResourcePattern(TOPIC, "z_other", LITERAL)
- val expected = new String(ZkAclStore(LITERAL).changeStore
- .createChangeNode(resource).bytes, UTF_8)
-
- addAcls(authorizer1, Set(denyReadAcl), resource)
-
- val actual = getAclChangeEventAsString(LITERAL)
-
- assertEquals(expected, actual)
- }
-
- @Test
- def testAuthorizerNoZkConfig(): Unit = {
- val noTlsProps = Kafka.getPropsFromArgs(Array(prepareDefaultConfig))
- val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
- KafkaConfig.fromProps(noTlsProps),
- noTlsProps.asInstanceOf[java.util.Map[String, Any]].asScala)
- ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach {
propName =>
- assertNull(zkClientConfig.getProperty(propName))
- }
- }
-
- @Test
- def testAuthorizerZkConfigFromKafkaConfigWithDefaults(): Unit = {
- val props = new java.util.Properties()
- val kafkaValue = "kafkaValue"
- val configs = Map("zookeeper.connect" -> "somewhere", // required,
otherwise we would omit it
- ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG -> "true",
- ZkConfigs.ZK_CLIENT_CNXN_SOCKET_CONFIG -> kafkaValue,
- ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_CONFIG -> kafkaValue,
- ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_CONFIG -> kafkaValue,
- ZkConfigs.ZK_SSL_KEY_STORE_TYPE_CONFIG -> kafkaValue,
- ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_CONFIG -> kafkaValue,
- ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_CONFIG -> kafkaValue,
- ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_CONFIG -> kafkaValue,
- ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_CONFIG -> kafkaValue,
- ZkConfigs.ZK_SSL_CIPHER_SUITES_CONFIG -> kafkaValue)
- configs.foreach { case (key, value) => props.put(key, value) }
-
- val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
- KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*))
- // confirm we get all the values we expect
- ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(prop
=> prop match {
- case ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG |
ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG =>
- assertEquals("true",
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
- case ZkConfigs.ZK_SSL_CRL_ENABLE_CONFIG |
ZkConfigs.ZK_SSL_OCSP_ENABLE_CONFIG =>
- assertEquals("false",
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
- case ZkConfigs.ZK_SSL_PROTOCOL_CONFIG =>
- assertEquals("TLSv1.2",
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
- case _ => assertEquals(kafkaValue,
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
- })
- }
-
- @Test
- def testAuthorizerZkConfigFromKafkaConfig(): Unit = {
- val props = new java.util.Properties()
- val kafkaValue = "kafkaValue"
- val configs = Map("zookeeper.connect" -> "somewhere", // required,
otherwise we would omit it
- ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG -> "true",
- ZkConfigs.ZK_CLIENT_CNXN_SOCKET_CONFIG -> kafkaValue,
- ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_CONFIG -> kafkaValue,
- ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_CONFIG -> kafkaValue,
- ZkConfigs.ZK_SSL_KEY_STORE_TYPE_CONFIG -> kafkaValue,
- ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_CONFIG -> kafkaValue,
- ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_CONFIG -> kafkaValue,
- ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_CONFIG -> kafkaValue,
- ZkConfigs.ZK_SSL_PROTOCOL_CONFIG -> kafkaValue,
- ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_CONFIG -> kafkaValue,
- ZkConfigs.ZK_SSL_CIPHER_SUITES_CONFIG -> kafkaValue,
- ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG -> "HTTPS",
- ZkConfigs.ZK_SSL_CRL_ENABLE_CONFIG -> "false",
- ZkConfigs.ZK_SSL_OCSP_ENABLE_CONFIG -> "false")
- configs.foreach{case (key, value) => props.put(key, value) }
-
- val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
- KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*))
- // confirm we get all the values we expect
- ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(prop
=> prop match {
- case ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG |
ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG =>
- assertEquals("true",
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
- case ZkConfigs.ZK_SSL_CRL_ENABLE_CONFIG |
ZkConfigs.ZK_SSL_OCSP_ENABLE_CONFIG =>
- assertEquals("false",
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
- case _ => assertEquals(kafkaValue,
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
- })
- }
-
- @Test
- def testAuthorizerZkConfigFromPrefixOverrides(): Unit = {
- val props = new java.util.Properties()
- val kafkaValue = "kafkaValue"
- val prefixedValue = "prefixedValue"
- val prefix = "authorizer."
- val configs = Map("zookeeper.connect" -> "somewhere", // required,
otherwise we would omit it
- ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG -> "false",
- ZkConfigs.ZK_CLIENT_CNXN_SOCKET_CONFIG -> kafkaValue,
- ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_CONFIG -> kafkaValue,
- ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_CONFIG -> kafkaValue,
- ZkConfigs.ZK_SSL_KEY_STORE_TYPE_CONFIG -> kafkaValue,
- ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_CONFIG -> kafkaValue,
- ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_CONFIG -> kafkaValue,
- ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_CONFIG -> kafkaValue,
- ZkConfigs.ZK_SSL_PROTOCOL_CONFIG -> kafkaValue,
- ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_CONFIG -> kafkaValue,
- ZkConfigs.ZK_SSL_CIPHER_SUITES_CONFIG -> kafkaValue,
- ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG -> "HTTPS",
- ZkConfigs.ZK_SSL_CRL_ENABLE_CONFIG -> "false",
- ZkConfigs.ZK_SSL_OCSP_ENABLE_CONFIG -> "false",
- prefix + ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG -> "true",
- prefix + ZkConfigs.ZK_CLIENT_CNXN_SOCKET_CONFIG -> prefixedValue,
- prefix + ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_CONFIG -> prefixedValue,
- prefix + ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_CONFIG -> prefixedValue,
- prefix + ZkConfigs.ZK_SSL_KEY_STORE_TYPE_CONFIG -> prefixedValue,
- prefix + ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_CONFIG -> prefixedValue,
- prefix + ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_CONFIG -> prefixedValue,
- prefix + ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_CONFIG -> prefixedValue,
- prefix + ZkConfigs.ZK_SSL_PROTOCOL_CONFIG -> prefixedValue,
- prefix + ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_CONFIG -> prefixedValue,
- prefix + ZkConfigs.ZK_SSL_CIPHER_SUITES_CONFIG -> prefixedValue,
- prefix + ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG -> "",
- prefix + ZkConfigs.ZK_SSL_CRL_ENABLE_CONFIG -> "true",
- prefix + ZkConfigs.ZK_SSL_OCSP_ENABLE_CONFIG -> "true")
- configs.foreach{case (key, value) => props.put(key, value.toString) }
-
- val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(
- KafkaConfig.fromProps(props), mutable.Map(configs.toSeq: _*))
- // confirm we get all the values we expect
- ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.asScala.keys.foreach(prop
=> prop match {
- case ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG |
ZkConfigs.ZK_SSL_CRL_ENABLE_CONFIG | ZkConfigs.ZK_SSL_OCSP_ENABLE_CONFIG =>
- assertEquals("true",
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
- case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG =>
- assertEquals("false",
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
- case _ => assertEquals(prefixedValue,
KafkaConfig.zooKeeperClientProperty(zkClientConfig, prop).getOrElse("<None>"))
- })
- }
-
- @Test
Review Comment:
Are these tests not relevant in KRaft mode?
##########
core/src/test/scala/unit/kafka/security/authorizer/AuthorizerTest.scala:
##########
@@ -392,161 +409,17 @@ class AuthorizerTest extends QuorumTestHarness with
BaseAuthorizerTest {
//test remove all acls for resource
removeAcls(authorizer1, Set.empty, resource)
TestUtils.waitAndVerifyAcls(Set.empty[AccessControlEntry], authorizer1,
resource)
- if (quorum.equals(ZK)) {
- assertFalse(zkClient.resourceExists(resource))
- }
//test removing last acl also deletes ZooKeeper path
acls = changeAclAndVerify(Set.empty, Set(acl1), Set.empty)
changeAclAndVerify(acls, Set.empty, acls)
- if (quorum.equals(ZK)) {
- assertFalse(zkClient.resourceExists(resource))
- }
- }
-
- @Test
- def testLoadCache(): Unit = {
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val acl1 = new AccessControlEntry(user1.toString, "host-1", READ, ALLOW)
- val acls = Set(acl1)
- addAcls(authorizer1, acls, resource)
-
- val user2 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
- val resource1 = new ResourcePattern(TOPIC, "test-2", LITERAL)
- val acl2 = new AccessControlEntry(user2.toString, "host3", READ, DENY)
- val acls1 = Set(acl2)
- addAcls(authorizer1, acls1, resource1)
-
- zkClient.deleteAclChangeNotifications()
- val authorizer = new AclAuthorizer
- try {
- configureAclAuthorizer(authorizer, config.originals)
-
- assertEquals(acls, getAcls(authorizer, resource))
- assertEquals(acls1, getAcls(authorizer, resource1))
- } finally {
- authorizer.close()
- }
}
/**
- * Verify that there is no timing window between loading ACL cache and
setting
- * up ZK change listener. Cache must be loaded before creating change
listener
- * in the authorizer to avoid the timing window.
+ * Test ACL inheritance, as described in
#{org.apache.kafka.common.acl.AclOperation}
*/
- @Test
- def testChangeListenerTiming(): Unit = {
- val configureSemaphore = new Semaphore(0)
- val listenerSemaphore = new Semaphore(0)
- val executor = Executors.newSingleThreadExecutor
- val aclAuthorizer3 = new AclAuthorizer {
- override private[authorizer] def startZkChangeListeners(): Unit = {
- configureSemaphore.release()
- listenerSemaphore.acquireUninterruptibly()
- super.startZkChangeListeners()
- }
- }
- try {
- val future = executor.submit((() =>
aclAuthorizer3.configure(config.originals)): Runnable)
- configureSemaphore.acquire()
- val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
- val acls = Set(new AccessControlEntry(user1.toString, "host-1", READ,
DENY))
- addAcls(authorizer1, acls, resource)
-
- listenerSemaphore.release()
- future.get(10, TimeUnit.SECONDS)
-
- assertEquals(acls, getAcls(aclAuthorizer3, resource))
- } finally {
- aclAuthorizer3.close()
- executor.shutdownNow()
- }
- }
-
- @Test
Review Comment:
Are these tests not relevant in KRaft mode?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]