cmccabe commented on code in PR #12031:
URL: https://github.com/apache/kafka/pull/12031#discussion_r848914196
##########
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##########
@@ -351,30 +429,55 @@ void checkSection(Action action,
private static final Set<AclOperation> IMPLIES_DESCRIBE_CONFIGS =
Collections.unmodifiableSet(
EnumSet.of(DESCRIBE_CONFIGS, ALTER_CONFIGS));
+ static AuthorizationResult findResult(Action action,
+ AuthorizableRequestContext
requestContext,
+ StandardAcl acl) {
+ return findResult(
+ action,
+ matchingPrincipals(requestContext),
+ requestContext.clientAddress().getHostAddress(),
+ acl
+ );
+ }
+
+ static KafkaPrincipal baseKafkaPrincipal(AuthorizableRequestContext
context) {
+ KafkaPrincipal sessionPrincipal = context.principal();
+ return sessionPrincipal.getClass().equals(KafkaPrincipal.class)
+ ? sessionPrincipal
+ : new KafkaPrincipal(sessionPrincipal.getPrincipalType(),
sessionPrincipal.getName());
+ }
+
+ static Set<KafkaPrincipal> matchingPrincipals(AuthorizableRequestContext
context) {
+ KafkaPrincipal sessionPrincipal = context.principal();
+ KafkaPrincipal basePrincipal =
sessionPrincipal.getClass().equals(KafkaPrincipal.class)
+ ? sessionPrincipal
+ : new KafkaPrincipal(sessionPrincipal.getPrincipalType(),
sessionPrincipal.getName());
+ return Utils.mkSet(basePrincipal, WILDCARD_KAFKA_PRINCIPAL);
+ }
+
/**
* Determine what the result of applying an ACL to the given action and
request
* context should be. Note that this function assumes that the resource
name matches;
* the resource name is not checked here.
*
- * @param action The input action.
- * @param requestContext The input request context.
- * @param acl The input ACL.
- * @return null if the ACL does not match. The
authorization result
- * otherwise.
+ * @param action The input action.
+ * @param matchingPrincipals The set of input matching principals
+ * @param host The input host.
+ * @param acl The input ACL.
+ * @return null if the ACL does not match. The
authorization result
+ * otherwise.
*/
static AuthorizationResult findResult(Action action,
- AuthorizableRequestContext
requestContext,
+ Set<KafkaPrincipal>
matchingPrincipals,
+ String host,
StandardAcl acl) {
// Check if the principal matches. If it doesn't, return no result
(null).
- if (!acl.principal().equals(WILDCARD_PRINCIPAL)) {
- if
(!acl.principal().equals(requestContext.principal().toString())) return null;
+ if (!matchingPrincipals.contains(acl.kafkaPrincipal())) {
+ return null;
}
// Check if the host matches. If it doesn't, return no result (null).
- // The hostname should be cached in the InetAddress object, so calling
this more
- // than once shouldn't be too expensive.
- if (!acl.host().equals(WILDCARD)) {
- String host = requestContext.clientAddress().getHostAddress();
- if (!acl.host().equals(host)) return null;
+ if (!acl.host().equals(WILDCARD) && !acl.host().equals(host)) {
Review Comment:
I guess we never resolved the ip to a hostname in the first place, so it's
good that you're getting rid of this comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]