This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8a1c19804b2 KAFKA-20366: Add syntax highlighting in security docs
(#21965)
8a1c19804b2 is described below
commit 8a1c19804b2735a05ddb9086cd28882b746b9ba4
Author: gomudayya <[email protected]>
AuthorDate: Thu Apr 30 00:01:55 2026 +0900
KAFKA-20366: Add syntax highlighting in security docs (#21965)
Reviewers: Mickael Maison <[email protected]>
---
docs/security/authentication-using-sasl.md | 502 ++++++++++++---------
docs/security/authorization-and-acls.md | 140 +++---
.../encryption-and-authentication-using-ssl.md | 325 +++++++------
...ating-security-features-in-a-running-cluster.md | 72 +--
docs/security/listener-configuration.md | 62 +--
5 files changed, 642 insertions(+), 459 deletions(-)
diff --git a/docs/security/authentication-using-sasl.md
b/docs/security/authentication-using-sasl.md
index f4a049a24a6..2698e4f7a21 100644
--- a/docs/security/authentication-using-sasl.md
+++ b/docs/security/authentication-using-sasl.md
@@ -35,15 +35,17 @@ Kafka uses the Java Authentication and Authorization
Service ([JAAS](https://doc
`KafkaServer` is the section name in the JAAS file used by each
KafkaServer/Broker. This section provides SASL configuration options for the
broker including any SASL client connections made by the broker for
inter-broker communication. If multiple listeners are configured to use SASL,
the section name may be prefixed with the listener name in lower-case followed
by a period, e.g. `sasl_ssl.KafkaServer`.
Brokers may also configure JAAS using the broker configuration property
`sasl.jaas.config`. The property name must be prefixed with the listener prefix
including the SASL mechanism, i.e.
`listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config`. Only one login
module may be specified in the config value. If multiple mechanisms are
configured on a listener, configs must be provided for each mechanism using the
listener and mechanism prefix. For example,
-
-
listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule
required \
- username="admin" \
- password="admin-secret";
-
listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required \
- username="admin" \
- password="admin-secret" \
- user_admin="admin-secret" \
- user_alice="alice-secret";
+
+```java-properties
+listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule
required \
+ username="admin" \
+ password="admin-secret";
+listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required \
+ username="admin" \
+ password="admin-secret" \
+ user_admin="admin-secret" \
+ user_alice="alice-secret";
+```
If JAAS configuration is defined at different levels, the order of precedence
used is:
@@ -69,7 +71,7 @@ To configure SASL authentication on the clients using static
JAAS config file:
1. Add a JAAS config file with a client login section named `KafkaClient`.
Configure a login module in `KafkaClient` for the selected mechanism as
described in the examples for setting up GSSAPI (Kerberos), PLAIN, SCRAM, or
non-production/production OAUTHBEARER. For example, GSSAPI credentials may be
configured as:
- ```
+ ```text
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
@@ -81,7 +83,7 @@ To configure SASL authentication on the clients using static
JAAS config file:
2. Pass the JAAS config file location as JVM parameter to each client JVM. For
example:
- ```
+ ```text
-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
```
@@ -102,13 +104,13 @@ Kafka supports the following SASL mechanisms:
1. Configure a SASL port in server.properties, by adding at least one of
SASL_PLAINTEXT or SASL_SSL to the _listeners_ parameter, which contains one or
more comma-separated values:
- ```
+ ```java-properties
listeners=SASL_PLAINTEXT://host.name:port
```
If you are only configuring a SASL port (or if you want the Kafka brokers
to authenticate each other using SASL) then make sure you set the same SASL
protocol for inter-broker communication:
- ```
+ ```java-properties
security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
```
@@ -133,7 +135,7 @@ Note: When establishing connections to brokers via SASL,
clients may perform a r
If you have installed your own Kerberos, you will need to create these
principals yourself using the following commands:
- ```
+ ```bash
$ sudo /usr/sbin/kadmin.local -q 'addprinc -randkey
kafka/{hostname}@{REALM}'
$ sudo /usr/sbin/kadmin.local -q "ktadd -k
/etc/security/keytabs/{keytabname}.keytab kafka/{hostname}@{REALM}"
```
@@ -143,7 +145,7 @@ Note: When establishing connections to brokers via SASL,
clients may perform a r
1. Add a suitably modified JAAS file similar to the one below to each Kafka
broker's config directory, let's call it kafka_server_jaas.conf for this
example (note that each broker should have its own keytab):
- ```
+ ```text
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
@@ -157,7 +159,7 @@ Note: When establishing connections to brokers via SASL,
clients may perform a r
2. Pass the JAAS and optionally the krb5 file locations as JVM parameters to
each Kafka broker (see
[here](https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html)
for more details):
- ```
+ ```text
-Djava.security.krb5.conf=/etc/kafka/krb5.conf
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
```
@@ -166,7 +168,7 @@ Note: When establishing connections to brokers via SASL,
clients may perform a r
4. Configure SASL port and SASL mechanisms in server.properties as described
here. For example:
- ```
+ ```java-properties
listeners=SASL_PLAINTEXT://host.name:port
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
@@ -175,7 +177,7 @@ Note: When establishing connections to brokers via SASL,
clients may perform a r
We must also configure the service name in server.properties, which should
match the principal name of the kafka brokers. In the above example, principal
is "kafka/[email protected]", so:
- ```
+ ```java-properties
sasl.kerberos.service.name=kafka
```
@@ -183,29 +185,37 @@ Note: When establishing connections to brokers via SASL,
clients may perform a r
To configure SASL authentication on the clients:
1. Clients (producers, consumers, connect workers, etc) will authenticate to
the cluster with their own principal (usually with the same name as the user
running the client), so obtain or create these principals as needed. Then
configure the JAAS configuration property for each client. Different clients
within a JVM may run as different users by specifying different principals. The
property `sasl.jaas.config` in producer.properties or consumer.properties
describes how clients like produc [...]
-
- sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule
required \
- useKeyTab=true \
- storeKey=true \
- keyTab="/etc/security/keytabs/kafka_client.keytab" \
- principal="[email protected]";
-
-For command-line utilities like kafka-console-consumer or
kafka-console-producer, kinit can be used along with "useTicketCache=true" as
in:
-
- sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule
required \
- useTicketCache=true;
+
+ ```java-properties
+ sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
+ useKeyTab=true \
+ storeKey=true \
+ keyTab="/etc/security/keytabs/kafka_client.keytab" \
+ principal="[email protected]";
+ ```
+
+ For command-line utilities like kafka-console-consumer or
kafka-console-producer, kinit can be used along with "useTicketCache=true" as
in:
+
+ ```java-properties
+ sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
+ useTicketCache=true;
+ ```
JAAS configuration for clients may alternatively be specified as a JVM
parameter similar to brokers as described here. Clients use the login section
named `KafkaClient`. This option allows only one user for all client
connections from a JVM.
2. Make sure the keytabs configured in the JAAS configuration are readable by
the operating system user who is starting kafka client.
3. Optionally pass the krb5 file locations as JVM parameters to each client
JVM (see
[here](https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html)
for more details):
-
- -Djava.security.krb5.conf=/etc/kafka/krb5.conf
+
+ ```text
+ -Djava.security.krb5.conf=/etc/kafka/krb5.conf
+ ```
4. Configure the following properties in producer.properties or
consumer.properties:
-
- security.protocol=SASL_PLAINTEXT (or SASL_SSL)
- sasl.mechanism=GSSAPI
- sasl.kerberos.service.name=kafka
+
+ ```java-properties
+ security.protocol=SASL_PLAINTEXT (or SASL_SSL)
+ sasl.mechanism=GSSAPI
+ sasl.kerberos.service.name=kafka
+ ```
### Authentication using SASL/PLAIN
@@ -215,44 +225,54 @@ Under the default implementation of
`principal.builder.class`, the username is u
#### Configuring Kafka Brokers
1. Add a suitably modified JAAS file similar to the one below to each Kafka
broker's config directory, let's call it kafka_server_jaas.conf for this
example:
-
- KafkaServer {
- org.apache.kafka.common.security.plain.PlainLoginModule
required
- username="admin"
- password="admin-secret"
- user_admin="admin-secret"
- user_alice="alice-secret";
- };
-
-This configuration defines two users (_admin_ and _alice_). The properties
`username` and `password` in the `KafkaServer` section are used by the broker
to initiate connections to other brokers. In this example, _admin_ is the user
for inter-broker communication. The set of properties `user__userName_` defines
the passwords for all users that connect to the broker and the broker validates
all client connections including those from other brokers using these
properties.
+
+ ```text
+ KafkaServer {
+ org.apache.kafka.common.security.plain.PlainLoginModule required
+ username="admin"
+ password="admin-secret"
+ user_admin="admin-secret"
+ user_alice="alice-secret";
+ };
+ ```
+
+ This configuration defines two users (_admin_ and _alice_). The properties
`username` and `password` in the `KafkaServer` section are used by the broker
to initiate connections to other brokers. In this example, _admin_ is the user
for inter-broker communication. The set of properties `user__userName_` defines
the passwords for all users that connect to the broker and the broker validates
all client connections including those from other brokers using these
properties.
2. Pass the JAAS config file location as JVM parameter to each Kafka broker:
-
-
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
+
+ ```text
+ -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
+ ```
3. Configure SASL port and SASL mechanisms in server.properties as described
here. For example:
-
- listeners=SASL_SSL://host.name:port
- security.inter.broker.protocol=SASL_SSL
- sasl.mechanism.inter.broker.protocol=PLAIN
- sasl.enabled.mechanisms=PLAIN
+
+ ```java-properties
+ listeners=SASL_SSL://host.name:port
+ security.inter.broker.protocol=SASL_SSL
+ sasl.mechanism.inter.broker.protocol=PLAIN
+ sasl.enabled.mechanisms=PLAIN
+ ```
#### Configuring Kafka Clients
To configure SASL authentication on the clients:
1. Configure the JAAS configuration property for each client in
producer.properties or consumer.properties. The login module describes how the
clients like producer and consumer can connect to the Kafka Broker. The
following is an example configuration for a client for the PLAIN mechanism:
-
-
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required \
- username="alice" \
- password="alice-secret";
-The options `username` and `password` are used by clients to configure the
user for client connections. In this example, clients connect to the broker as
user _alice_. Different clients within a JVM may connect as different users by
specifying different user names and passwords in `sasl.jaas.config`.
+ ```java-properties
+ sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required \
+ username="alice" \
+ password="alice-secret";
+ ```
+
+ The options `username` and `password` are used by clients to configure the
user for client connections. In this example, clients connect to the broker as
user _alice_. Different clients within a JVM may connect as different users by
specifying different user names and passwords in `sasl.jaas.config`.
JAAS configuration for clients may alternatively be specified as a JVM
parameter similar to brokers as described here. Clients use the login section
named `KafkaClient`. This option allows only one user for all client
connections from a JVM.
2. Configure the following properties in producer.properties or
consumer.properties:
-
- security.protocol=SASL_SSL
- sasl.mechanism=PLAIN
+
+ ```java-properties
+ security.protocol=SASL_SSL
+ sasl.mechanism=PLAIN
+ ```
#### Use of SASL/PLAIN in production
@@ -268,62 +288,87 @@ Salted Challenge Response Authentication Mechanism
(SCRAM) is a family of SASL m
The SCRAM implementation in Kafka uses the metadata log as credential store.
Credentials can be created in the metadata log using `kafka-storage.sh` or
`kafka-configs.sh`. For each SCRAM mechanism enabled, credentials must be
created by adding a config with the mechanism name. Credentials for
inter-broker communication must be created before Kafka brokers are started.
`kafka-storage.sh` can format storage with initial credentials. Client
credentials may be created and updated dynamically [...]
Create initial SCRAM credentials for user _admin_ with password _admin-secret_
:
-
- $ bin/kafka-storage.sh format -t $(bin/kafka-storage.sh
random-uuid) -c config/server.properties --add-scram
'SCRAM-SHA-256=[name="admin",password="admin-secret"]'
+
+```bash
+$ bin/kafka-storage.sh format -t $(bin/kafka-storage.sh random-uuid) \
+ -c config/server.properties \
+ --add-scram 'SCRAM-SHA-256=[name="admin",password="admin-secret"]'
+```
Create SCRAM credentials for user _alice_ with password _alice-secret_ (refer
to Configuring Kafka Clients for client configuration):
-
- $ bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter
--add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret]'
--entity-type users --entity-name alice --command-config client.properties
+
+```bash
+$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter \
+ --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret]' \
+ --entity-type users --entity-name alice --command-config client.properties
+```
The default iteration count of 4096 is used if iterations are not specified. A
random salt is created if it's not specified. The SCRAM identity consisting of
salt, iterations, StoredKey and ServerKey are stored in the metadata log. See
[RFC 5802](https://tools.ietf.org/html/rfc5802) for details on SCRAM identity
and the individual fields.
Existing credentials may be listed using the _\--describe_ option:
-
- $ bin/kafka-configs.sh --bootstrap-server localhost:9092
--describe --entity-type users --entity-name alice --command-config
client.properties
+
+```bash
+$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe \
+ --entity-type users --entity-name alice --command-config client.properties
+```
Credentials may be deleted for one or more SCRAM mechanisms using the
_\--alter --delete-config_ option:
-
- $ bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter
--delete-config 'SCRAM-SHA-256' --entity-type users --entity-name alice
--command-config client.properties
+
+```bash
+$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter \
+ --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name alice \
+ --command-config client.properties
+```
#### Configuring Kafka Brokers
1. Add a suitably modified JAAS file similar to the one below to each Kafka
broker's config directory, let's call it kafka_server_jaas.conf for this
example:
-
- KafkaServer {
- org.apache.kafka.common.security.scram.ScramLoginModule
required
- username="admin"
- password="admin-secret";
- };
-
-The properties `username` and `password` in the `KafkaServer` section are used
by the broker to initiate connections to other brokers. In this example,
_admin_ is the user for inter-broker communication.
+
+ ```text
+ KafkaServer {
+ org.apache.kafka.common.security.scram.ScramLoginModule required
+ username="admin"
+ password="admin-secret";
+ };
+ ```
+
+ The properties `username` and `password` in the `KafkaServer` section are
used by the broker to initiate connections to other brokers. In this example,
_admin_ is the user for inter-broker communication.
2. Pass the JAAS config file location as JVM parameter to each Kafka broker:
-
-
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
+
+ ```text
+ -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
+ ```
3. Configure SASL port and SASL mechanisms in server.properties as described
here. For example:
-
- listeners=SASL_SSL://host.name:port
- security.inter.broker.protocol=SASL_SSL
- sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 (or
SCRAM-SHA-512)
- sasl.enabled.mechanisms=SCRAM-SHA-256 (or SCRAM-SHA-512)
+
+ ```java-properties
+ listeners=SASL_SSL://host.name:port
+ security.inter.broker.protocol=SASL_SSL
+ sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 (or SCRAM-SHA-512)
+ sasl.enabled.mechanisms=SCRAM-SHA-256 (or SCRAM-SHA-512)
+ ```
#### Configuring Kafka Clients
To configure SASL authentication on the clients:
1. Configure the JAAS configuration property for each client in
producer.properties or consumer.properties. The login module describes how the
clients like producer and consumer can connect to the Kafka Broker. The
following is an example configuration for a client for the SCRAM mechanisms:
-
-
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule
required \
- username="alice" \
- password="alice-secret";
-The options `username` and `password` are used by clients to configure the
user for client connections. In this example, clients connect to the broker as
user _alice_. Different clients within a JVM may connect as different users by
specifying different user names and passwords in `sasl.jaas.config`.
+ ```java-properties
+ sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule
required \
+ username="alice" \
+ password="alice-secret";
+ ```
+
+ The options `username` and `password` are used by clients to configure the
user for client connections. In this example, clients connect to the broker as
user _alice_. Different clients within a JVM may connect as different users by
specifying different user names and passwords in `sasl.jaas.config`.
JAAS configuration for clients may alternatively be specified as a JVM
parameter similar to brokers as described here. Clients use the login section
named `KafkaClient`. This option allows only one user for all client
connections from a JVM.
2. Configure the following properties in producer.properties or
consumer.properties:
-
- security.protocol=SASL_SSL
- sasl.mechanism=SCRAM-SHA-256 (or SCRAM-SHA-512)
+
+ ```java-properties
+ security.protocol=SASL_SSL
+ sasl.mechanism=SCRAM-SHA-256 (or SCRAM-SHA-512)
+ ```
#### Security Considerations for SASL/SCRAM
@@ -342,13 +387,15 @@ Under the default implementation of
`principal.builder.class`, the principalName
The default implementation of SASL/OAUTHBEARER in Kafka creates and validates
[Unsecured JSON Web Tokens](https://tools.ietf.org/html/rfc7515#appendix-A.5).
While suitable only for non-production use, it does provide the flexibility to
create arbitrary tokens in a DEV or TEST environment.
1. Add a suitably modified JAAS file similar to the one below to each Kafka
broker's config directory, let's call it kafka_server_jaas.conf for this
example:
-
- KafkaServer {
-
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
- unsecuredLoginStringClaim_sub="admin";
- };
-The property `unsecuredLoginStringClaim_sub` in the `KafkaServer` section is
used by the broker when it initiates connections to other brokers. In this
example, _admin_ will appear in the subject (`sub`) claim and will be the user
for inter-broker communication.
+ ```text
+ KafkaServer {
+ org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
required
+ unsecuredLoginStringClaim_sub="admin";
+ };
+ ```
+
+ The property `unsecuredLoginStringClaim_sub` in the `KafkaServer` section
is used by the broker when it initiates connections to other brokers. In this
example, _admin_ will appear in the subject (`sub`) claim and will be the user
for inter-broker communication.
Here are the various supported JAAS module options on the broker side for
[Unsecured JSON Web Token](https://tools.ietf.org/html/rfc7515#appendix-A.5)
validation:
<table>
@@ -399,36 +446,46 @@ Set to a positive integer value if you wish to allow up
to some number of positi
</td> </tr> </table>
2. Pass the JAAS config file location as JVM parameter to each Kafka broker:
-
-
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
+
+ ```text
+ -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
+ ```
3. Configure SASL port and SASL mechanisms in server.properties as described
here. For example:
-
- listeners=SASL_SSL://host.name:port (or SASL_PLAINTEXT if
non-production)
- security.inter.broker.protocol=SASL_SSL (or SASL_PLAINTEXT if
non-production)
- sasl.mechanism.inter.broker.protocol=OAUTHBEARER
- sasl.enabled.mechanisms=OAUTHBEARER
+
+ ```java-properties
+ listeners=SASL_SSL://host.name:port (or SASL_PLAINTEXT if non-production)
+ security.inter.broker.protocol=SASL_SSL (or SASL_PLAINTEXT if
non-production)
+ sasl.mechanism.inter.broker.protocol=OAUTHBEARER
+ sasl.enabled.mechanisms=OAUTHBEARER
+ ```
#### Configuring Production Kafka Brokers
1. Add a suitably modified JAAS file similar to the one below to each Kafka
broker's config directory, let's call it kafka_server_jaas.conf for this
example:
-
- KafkaServer {
-
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ;
- };
+
+ ```text
+ KafkaServer {
+ org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
required ;
+ };
+ ```
2. Pass the JAAS config file location as JVM parameter to each Kafka broker:
-
-
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
+
+ ```text
+ -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
+ ```
3. Configure SASL port and SASL mechanisms in server.properties as described
here. For example:
-
- listeners=SASL_SSL://host.name:port
- security.inter.broker.protocol=SASL_SSL
- sasl.mechanism.inter.broker.protocol=OAUTHBEARER
- sasl.enabled.mechanisms=OAUTHBEARER
- listener.name.<listener
name>.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallbackHandler
- listener.name.<listener
name>.oauthbearer.sasl.oauthbearer.jwks.endpoint.url=https://example.com/oauth2/v1/keys
+
+ ```java-properties
+ listeners=SASL_SSL://host.name:port
+ security.inter.broker.protocol=SASL_SSL
+ sasl.mechanism.inter.broker.protocol=OAUTHBEARER
+ sasl.enabled.mechanisms=OAUTHBEARER
+ listener.name.<listener
name>.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallbackHandler
+ listener.name.<listener
name>.oauthbearer.sasl.oauthbearer.jwks.endpoint.url=https://example.com/oauth2/v1/keys
+ ```
The OAUTHBEARER broker configuration includes:
* sasl.oauthbearer.clock.skew.seconds
@@ -444,11 +501,13 @@ The OAUTHBEARER broker configuration includes:
To configure SASL authentication on the clients:
1. Configure the JAAS configuration property for each client in
producer.properties or consumer.properties. The login module describes how the
clients like producer and consumer can connect to the Kafka Broker. The
following is an example configuration for a client for the OAUTHBEARER
mechanisms:
-
-
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
required \
- unsecuredLoginStringClaim_sub="alice";
-The option `unsecuredLoginStringClaim_sub` is used by clients to configure the
subject (`sub`) claim, which determines the user for client connections. In
this example, clients connect to the broker as user _alice_. Different clients
within a JVM may connect as different users by specifying different subject
(`sub`) claims in `sasl.jaas.config`.
+ ```java-properties
+
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
required \
+ unsecuredLoginStringClaim_sub="alice";
+ ```
+
+ The option `unsecuredLoginStringClaim_sub` is used by clients to configure
the subject (`sub`) claim, which determines the user for client connections. In
this example, clients connect to the broker as user _alice_. Different clients
within a JVM may connect as different users by specifying different subject
(`sub`) claims in `sasl.jaas.config`.
The default implementation of SASL/OAUTHBEARER in Kafka creates and validates
[Unsecured JSON Web Tokens](https://tools.ietf.org/html/rfc7515#appendix-A.5).
While suitable only for non-production use, it does provide the flexibility to
create arbitrary tokens in a DEV or TEST environment.
@@ -529,17 +588,21 @@ Set to a custom claim name if you wish the name of the
`String` or `String List`
JAAS configuration for clients may alternatively be specified as a JVM
parameter similar to brokers as described here. Clients use the login section
named `KafkaClient`. This option allows only one user for all client
connections from a JVM.
2. Configure the following properties in producer.properties or
consumer.properties:
-
- security.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
- sasl.mechanism=OAUTHBEARER
+
+ ```java-properties
+ security.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
+ sasl.mechanism=OAUTHBEARER
+ ```
3. The default implementation of SASL/OAUTHBEARER depends on the
jackson-databind library. Since it's an optional dependency, users have to
configure it as a dependency via their build tool.
#### Configuring Production Kafka Clients
To configure SASL authentication on the clients:
1. Configure the JAAS configuration property for each client in
producer.properties or consumer.properties. The login module describes how the
clients like producer and consumer can connect to the Kafka Broker. The
following is an example configuration for a client for the OAUTHBEARER
mechanisms:
-
-
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
required ;
+
+ ```java-properties
+
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
required ;
+ ```
JAAS configuration for clients may alternatively be specified as a JVM
parameter similar to brokers as described here. Clients use the login section
named `KafkaClient`. This option allows only one user for all client
connections from a JVM.
@@ -547,49 +610,57 @@ JAAS configuration for clients may alternatively be
specified as a JVM parameter
For example, if using the OAuth `client_credentials` grant type with a client
secret to communicate with the OAuth identity provider, the configuration might
look like this:
- security.protocol=SASL_SSL
- sasl.mechanism=OAUTHBEARER
- sasl.oauthbearer.client.credentials.client.id=jdoe
- sasl.oauthbearer.client.credentials.client.secret=$3cr3+
- sasl.oauthbearer.scope=my-application-scope
-
sasl.oauthbearer.token.endpoint.url=https://example.com/oauth2/v1/token
+ ```java-properties
+ security.protocol=SASL_SSL
+ sasl.mechanism=OAUTHBEARER
+ sasl.oauthbearer.client.credentials.client.id=jdoe
+ sasl.oauthbearer.client.credentials.client.secret=$3cr3+
+ sasl.oauthbearer.scope=my-application-scope
+ sasl.oauthbearer.token.endpoint.url=https://example.com/oauth2/v1/token
+ ```
Alternatively, the `client_credentials` grant type also supports client
assertion authentication as defined in [RFC
7523](https://tools.ietf.org/html/rfc7523). Instead of sending a client secret,
the client authenticates by presenting a signed JWT assertion to the OAuth
identity provider. This provides enhanced security since private keys never
leave the client and assertions are short-lived. See
[KIP-1258](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1258) for
details.
When using client assertion with dynamically-generated JWTs (recommended), the
configuration might look like this:
- security.protocol=SASL_SSL
- sasl.mechanism=OAUTHBEARER
-
sasl.oauthbearer.token.endpoint.url=https://example.com/oauth2/v1/token
-
sasl.oauthbearer.assertion.private.key.file=/path/to/private-key.pem
- sasl.oauthbearer.assertion.algorithm=RS256
- sasl.oauthbearer.assertion.claim.iss=my-kafka-client
- sasl.oauthbearer.assertion.claim.sub=my-service-account
- sasl.oauthbearer.assertion.claim.aud=https://example.com
- sasl.oauthbearer.assertion.claim.exp.seconds=300
- sasl.oauthbearer.assertion.claim.jti.include=true
- sasl.oauthbearer.scope=my-application-scope
+ ```java-properties
+ security.protocol=SASL_SSL
+ sasl.mechanism=OAUTHBEARER
+ sasl.oauthbearer.token.endpoint.url=https://example.com/oauth2/v1/token
+ sasl.oauthbearer.assertion.private.key.file=/path/to/private-key.pem
+ sasl.oauthbearer.assertion.algorithm=RS256
+ sasl.oauthbearer.assertion.claim.iss=my-kafka-client
+ sasl.oauthbearer.assertion.claim.sub=my-service-account
+ sasl.oauthbearer.assertion.claim.aud=https://example.com
+ sasl.oauthbearer.assertion.claim.exp.seconds=300
+ sasl.oauthbearer.assertion.claim.jti.include=true
+ sasl.oauthbearer.scope=my-application-scope
+ ```
Alternatively, a pre-generated JWT assertion can be read from a file. This is
useful when assertions are generated by an external process or secrets manager:
- security.protocol=SASL_SSL
- sasl.mechanism=OAUTHBEARER
-
sasl.oauthbearer.token.endpoint.url=https://example.com/oauth2/v1/token
- sasl.oauthbearer.assertion.file=/path/to/assertion.jwt
- sasl.oauthbearer.scope=my-application-scope
+ ```java-properties
+ security.protocol=SASL_SSL
+ sasl.mechanism=OAUTHBEARER
+ sasl.oauthbearer.token.endpoint.url=https://example.com/oauth2/v1/token
+ sasl.oauthbearer.assertion.file=/path/to/assertion.jwt
+ sasl.oauthbearer.scope=my-application-scope
+ ```
For dynamically-generated assertions, additional static claims can be provided
via a JSON template file using `sasl.oauthbearer.assertion.template.file`. The
template supports `header` and `payload` sections whose values are merged into
the generated JWT:
- {
- "header": {
- "kid": "my-key-id"
- },
- "payload": {
- "iss": "my-kafka-client",
- "sub": "my-service-account",
- "aud": "https://example.com"
- }
- }
+ ```json
+ {
+ "header": {
+ "kid": "my-key-id"
+ },
+ "payload": {
+ "iss": "my-kafka-client",
+ "sub": "my-service-account",
+ "aud": "https://example.com"
+ }
+ }
+ ```
When both client assertion and client secret configurations are present, the
`ClientCredentialsJwtRetriever` uses a three-tier preference order:
@@ -601,15 +672,17 @@ This selection is made at configuration time. Once a
method is selected, it pers
Or, if using the OAuth `urn:ietf:params:oauth:grant-type:jwt-bearer` grant
type to communicate with the OAuth identity provider, the
`JwtBearerJwtRetriever` must be configured explicitly since the default
retriever delegates to `ClientCredentialsJwtRetriever`:
- security.protocol=SASL_SSL
- sasl.mechanism=OAUTHBEARER
-
sasl.oauthbearer.jwt.retriever.class=org.apache.kafka.common.security.oauthbearer.JwtBearerJwtRetriever
- sasl.oauthbearer.assertion.private.key.file=/path/to/private.key
- sasl.oauthbearer.assertion.algorithm=RS256
- sasl.oauthbearer.assertion.claim.exp.seconds=600
- sasl.oauthbearer.assertion.template.file=/path/to/template.json
- sasl.oauthbearer.scope=my-application-scope
-
sasl.oauthbearer.token.endpoint.url=https://example.com/oauth2/v1/token
+ ```java-properties
+ security.protocol=SASL_SSL
+ sasl.mechanism=OAUTHBEARER
+
sasl.oauthbearer.jwt.retriever.class=org.apache.kafka.common.security.oauthbearer.JwtBearerJwtRetriever
+ sasl.oauthbearer.assertion.private.key.file=/path/to/private.key
+ sasl.oauthbearer.assertion.algorithm=RS256
+ sasl.oauthbearer.assertion.claim.exp.seconds=600
+ sasl.oauthbearer.assertion.template.file=/path/to/template.json
+ sasl.oauthbearer.scope=my-application-scope
+ sasl.oauthbearer.token.endpoint.url=https://example.com/oauth2/v1/token
+ ```
The OAUTHBEARER client configuration includes:
* sasl.oauthbearer.assertion.algorithm
@@ -675,29 +748,35 @@ Production use cases will also require writing an
implementation of `org.apache.
### Enabling multiple SASL mechanisms in a broker
1. Specify configuration for the login modules of all enabled mechanisms in
the `KafkaServer` section of the JAAS config file. For example:
-
- KafkaServer {
- com.sun.security.auth.module.Krb5LoginModule required
- useKeyTab=true
- storeKey=true
- keyTab="/etc/security/keytabs/kafka_server.keytab"
- principal="kafka/[email protected]";
-
- org.apache.kafka.common.security.plain.PlainLoginModule
required
- username="admin"
- password="admin-secret"
- user_admin="admin-secret"
- user_alice="alice-secret";
- };
+
+ ```text
+ KafkaServer {
+ com.sun.security.auth.module.Krb5LoginModule required
+ useKeyTab=true
+ storeKey=true
+ keyTab="/etc/security/keytabs/kafka_server.keytab"
+ principal="kafka/[email protected]";
+
+ org.apache.kafka.common.security.plain.PlainLoginModule required
+ username="admin"
+ password="admin-secret"
+ user_admin="admin-secret"
+ user_alice="alice-secret";
+ };
+ ```
2. Enable the SASL mechanisms in server.properties:
-
-
sasl.enabled.mechanisms=GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER
+
+ ```java-properties
+ sasl.enabled.mechanisms=GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER
+ ```
3. Specify the SASL security protocol and mechanism for inter-broker
communication in server.properties if required:
-
- security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
- sasl.mechanism.inter.broker.protocol=GSSAPI (or one of the other
enabled mechanisms)
+
+ ```java-properties
+ security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
+ sasl.mechanism.inter.broker.protocol=GSSAPI (or one of the other enabled
mechanisms)
+ ```
4. Follow the mechanism-specific steps in GSSAPI (Kerberos), PLAIN, SCRAM, and
non-production/production OAUTHBEARER to configure SASL for the enabled
mechanisms.
### Modifying SASL mechanism in a Running Cluster
@@ -734,24 +813,41 @@ Tokens can also be cancelled explicitly. If a token is
not renewed by the token
Tokens can be created by using Admin APIs or using
`kafka-delegation-tokens.sh` script. Delegation token requests
(create/renew/expire/describe) should be issued only on SASL or SSL
authenticated channels. Tokens can not be requests if the initial
authentication is done through delegation token. A token can be created by the
user for that user or others as well by specifying the `--owner-principal`
parameter. Owner/Renewers can renew or expire tokens. Owner/renewers can always
describe t [...]
Create a delegation token:
-
- $ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092
--create --max-life-time-period -1 --command-config client.properties
--renewer-principal User:user1
+
+```bash
+$ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create \
+ --max-life-time-period -1 --command-config client.properties \
+ --renewer-principal User:user1
+```
Create a delegation token for a different owner:
-
- $ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092
--create --max-life-time-period -1 --command-config client.properties
--renewer-principal User:user1 --owner-principal User:owner1
+
+```bash
+$ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create \
+ --max-life-time-period -1 --command-config client.properties \
+ --renewer-principal User:user1 --owner-principal User:owner1
+```
Renew a delegation token:
-
- $ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092
--renew --renew-time-period -1 --command-config client.properties --hmac
ABCDEFGHIJK
+
+```bash
+$ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --renew \
+ --renew-time-period -1 --command-config client.properties --hmac
ABCDEFGHIJK
+```
Expire a delegation token:
-
- $ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092
--expire --expiry-time-period -1 --command-config client.properties --hmac
ABCDEFGHIJK
+
+```bash
+$ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --expire \
+ --expiry-time-period -1 --command-config client.properties --hmac
ABCDEFGHIJK
+```
Existing tokens can be described using the --describe option:
-
- $ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092
--describe --command-config client.properties --owner-principal User:user1
+
+```bash
+$ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --describe \
+ --command-config client.properties --owner-principal User:user1
+```
#### Token Authentication
@@ -760,13 +856,15 @@ Delegation token authentication piggybacks on the current
SASL/SCRAM authenticat
Configuring Kafka Clients:
1. Configure the JAAS configuration property for each client in
producer.properties or consumer.properties. The login module describes how the
clients like producer and consumer can connect to the Kafka Broker. The
following is an example configuration for a client for the token
authentication:
-
-
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule
required \
- username="tokenID123" \
- password="lAYYSFmLs4bTjf+lTZ1LCHR/ZZFNA==" \
- tokenauth="true";
-The options `username` and `password` are used by clients to configure the
token id and token HMAC. And the option `tokenauth` is used to indicate the
server about token authentication. In this example, clients connect to the
broker using token id: _tokenID123_. Different clients within a JVM may connect
using different tokens by specifying different token details in
`sasl.jaas.config`.
+ ```java-properties
+ sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule
required \
+ username="tokenID123" \
+ password="lAYYSFmLs4bTjf+lTZ1LCHR/ZZFNA==" \
+ tokenauth="true";
+ ```
+
+ The options `username` and `password` are used by clients to configure the
token id and token HMAC. And the option `tokenauth` is used to indicate the
server about token authentication. In this example, clients connect to the
broker using token id: _tokenID123_. Different clients within a JVM may connect
using different tokens by specifying different token details in
`sasl.jaas.config`.
JAAS configuration for clients may alternatively be specified as a JVM
parameter similar to brokers as described here. Clients use the login section
named `KafkaClient`. This option allows only one user for all client
connections from a JVM.
diff --git a/docs/security/authorization-and-acls.md
b/docs/security/authorization-and-acls.md
index 6349dbd6362..4d7abf94e67 100644
--- a/docs/security/authorization-and-acls.md
+++ b/docs/security/authorization-and-acls.md
@@ -27,9 +27,10 @@ type: docs
Kafka ships with a pluggable authorization framework, which is configured with
the `authorizer.class.name` property in the server configuration. Configured
implementations must extend `org.apache.kafka.server.authorizer.Authorizer`.
Kafka provides a default implementation which store ACLs in the cluster
metadata (KRaft metadata log). For KRaft clusters, use the following
configuration on all nodes (brokers, controllers, or combined broker/controller
nodes):
-
-
-
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
+
+```java-properties
+authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
+```
Kafka ACLs are defined in the general format of "Principal {P} is
[Allowed|Denied] Operation {O} From Host {H} on any Resource {R} matching
ResourcePattern {RP}". You can read more about the ACL structure in
[KIP-11](https://cwiki.apache.org/confluence/x/XIUWAw) and resource patterns in
[KIP-290](https://cwiki.apache.org/confluence/x/QpvLB). In order to add,
remove, or list ACLs, you can use the Kafka ACL CLI `kafka-acls.sh`.
@@ -40,14 +41,16 @@ If a resource (R) does not have any ACLs defined, meaning
that no ACL matches th
### _Changing the Default Behavior:_
If you prefer that resources without any ACLs be accessible by all users
(instead of just super users), you can change the default behavior. To do this,
add the following line to your server.properties file:
-
-
- allow.everyone.if.no.acl.found=true
+
+```java-properties
+allow.everyone.if.no.acl.found=true
+```
With this setting enabled, if a resource does not have any ACLs defined, Kafka
will allow access to everyone. If a resource has one or more ACLs defined,
those ACL rules will be enforced as usual, regardless of the setting. One can
also add super users in server.properties like the following (note that the
delimiter is semicolon since SSL user names may contain comma). Default
PrincipalType string "User" is case sensitive.
-
-
- super.users=User:Bob;User:Alice
+
+```java-properties
+super.users=User:Bob;User:Alice
+```
### KRaft Principal Forwarding
@@ -58,41 +61,46 @@ All of this implies that Kafka must understand how to
serialize and deserialize
By default, the SSL user name will be of the form
"CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown". One can
change that by setting `ssl.principal.mapping.rules` to a customized rule in
server.properties. This config allows a list of rules for mapping X.500
distinguished name to short name. The rules are evaluated in order and the
first rule that matches a distinguished name is used to map it to a short name.
Any later rules in the list are ignored.
The format of `ssl.principal.mapping.rules` is a list where each rule starts
with "RULE:" and contains an expression as the following formats. Default rule
will return string representation of the X.500 certificate distinguished name.
If the distinguished name matches the pattern, then the replacement command
will be run over the name. This also supports lowercase/uppercase options, to
force the translated result to be all lower/uppercase case. This is done by
adding a "/L" or "/U' to th [...]
-
-
- RULE:pattern/replacement/
- RULE:pattern/replacement/[LU]
+
+```text
+RULE:pattern/replacement/
+RULE:pattern/replacement/[LU]
+```
Example `ssl.principal.mapping.rules` values are:
-
-
- RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,
- RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,
- RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,
- DEFAULT
+
+```text
+RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,
+RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,
+RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,
+DEFAULT
+```
Above rules translate distinguished name
"CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to
"serviceuser" and
"CN=adminUser,OU=Admin,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to
"adminuser@admin".
For advanced use cases, one can customize the name by setting a customized
PrincipalBuilder in server.properties like the following.
-
-
- principal.builder.class=CustomizedPrincipalBuilderClass
+
+```java-properties
+principal.builder.class=CustomizedPrincipalBuilderClass
+```
### Customizing SASL User Name
By default, the SASL user name will be the primary part of the Kerberos
principal. One can change that by setting
`sasl.kerberos.principal.to.local.rules` to a customized rule in
server.properties. The format of `sasl.kerberos.principal.to.local.rules` is a
list where each rule works in the same way as the auth_to_local in [Kerberos
configuration file
(krb5.conf)](https://web.mit.edu/Kerberos/krb5-latest/doc/admin/conf_files/krb5_conf.html).
This also support additional lowercase/upperca [...]
-
-
- RULE:[n:string](regexp)s/pattern/replacement/
- RULE:[n:string](regexp)s/pattern/replacement/g
- RULE:[n:string](regexp)s/pattern/replacement//L
- RULE:[n:string](regexp)s/pattern/replacement/g/L
- RULE:[n:string](regexp)s/pattern/replacement//U
- RULE:[n:string](regexp)s/pattern/replacement/g/U
+
+```text
+RULE:[n:string](regexp)s/pattern/replacement/
+RULE:[n:string](regexp)s/pattern/replacement/g
+RULE:[n:string](regexp)s/pattern/replacement//L
+RULE:[n:string](regexp)s/pattern/replacement/g/L
+RULE:[n:string](regexp)s/pattern/replacement//U
+RULE:[n:string](regexp)s/pattern/replacement/g/U
+```
An example of adding a rule to properly translate [email protected] to user
while also keeping the default rule in place is:
-
-
-
sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT
+
+```java-properties
+sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT
+```
## Command Line Interface
@@ -536,53 +544,75 @@ Convenience
* **Adding Acls**
Suppose you want to add an acl "Principals User:Bob and User:Alice are allowed
to perform Operation Read and Write on Topic Test-Topic from IP 198.51.100.0
and IP 198.51.100.1". You can do that by executing the CLI with following
options:
-
- $ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add
--allow-principal User:Bob --allow-principal User:Alice --allow-host
198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write
--topic Test-topic
+
+```bash
+$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal
User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host
198.51.100.1 --operation Read --operation Write --topic Test-topic
+```
By default, all principals that don't have an explicit acl that allows access
for an operation to a resource are denied. In rare cases where an allow acl is
defined that allows access to all but some principal we will have to use the
--deny-principal and --deny-host option. For example, if we want to allow all
users to Read from Test-topic but only deny User:BadBob from IP 198.51.100.3 we
can do so using following commands:
-
- $ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add
--allow-principal User:'*' --allow-host '*' --deny-principal User:BadBob
--deny-host 198.51.100.3 --operation Read --topic Test-topic
+
+```bash
+$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal
User:'*' --allow-host '*' --deny-principal User:BadBob --deny-host 198.51.100.3
--operation Read --topic Test-topic
+```
Note that `--allow-host` and `--deny-host` only support IP addresses
(hostnames are not supported). Above examples add acls to a topic by specifying
--topic [topic-name] as the resource pattern option. Similarly user can add
acls to cluster by specifying --cluster and to a consumer group by specifying
--group [group-name]. You can add acls on any resource of a certain type, e.g.
suppose you wanted to add an acl "Principal User:Peter is allowed to produce to
any Topic from IP 198.51.200.0 [...]
-
- $ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add
--allow-principal User:Peter --allow-host 198.51.200.1 --producer --topic '*'
+
+```bash
+$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal
User:Peter --allow-host 198.51.200.1 --producer --topic '*'
+```
You can add acls on prefixed resource patterns, e.g. suppose you want to add
an acl "Principal User:Jane is allowed to produce to any Topic whose name
starts with 'Test-' from any host". You can do that by executing the CLI with
following options:
-
- $ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add
--allow-principal User:Jane --producer --topic Test- --resource-pattern-type
prefixed
+
+```bash
+$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal
User:Jane --producer --topic Test- --resource-pattern-type prefixed
+```
Note, --resource-pattern-type defaults to 'literal', which only affects
resources with the exact same name or, in the case of the wildcard resource
name '*', a resource with any name.
* **Removing Acls**
Removing acls is pretty much the same. The only difference is instead of --add
option users will have to specify --remove option. To remove the acls added by
the first example above we can execute the CLI with following options:
-
- $ bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove
--allow-principal User:Bob --allow-principal User:Alice --allow-host
198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write
--topic Test-topic
+
+```bash
+$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove
--allow-principal User:Bob --allow-principal User:Alice --allow-host
198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write
--topic Test-topic
+```
If you want to remove the acl added to the prefixed resource pattern above we
can execute the CLI with following options:
-
- $ bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove
--allow-principal User:Jane --producer --topic Test- --resource-pattern-type
Prefixed
+
+```bash
+$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove
--allow-principal User:Jane --producer --topic Test- --resource-pattern-type
Prefixed
+```
* **List Acls**
We can list acls for any resource by specifying the --list option with the
resource. To list all acls on the literal resource pattern Test-topic, we can
execute the CLI with following options:
-
- $ bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic
Test-topic
+
+```bash
+$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic Test-topic
+```
However, this will only return the acls that have been added to this exact
resource pattern. Other acls can exist that affect access to the topic, e.g.
any acls on the topic wildcard '*', or any acls on prefixed resource patterns.
Acls on the wildcard resource pattern can be queried explicitly:
-
- $ bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic
'*'
+
+```bash
+$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic '*'
+```
However, it is not necessarily possible to explicitly query for acls on
prefixed resource patterns that match Test-topic as the name of such patterns
may not be known. We can list _all_ acls affecting Test-topic by using
'--resource-pattern-type match', e.g.
-
- > bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic
Test-topic --resource-pattern-type match
+
+```bash
+$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic
Test-topic --resource-pattern-type match
+```
This will list acls on all matching literal, wildcard and prefixed resource
patterns.
* **Adding or removing a principal as producer or consumer**
The most common use case for acl management are adding/removing a principal as
producer or consumer so we added convenience options to handle these cases. In
order to add User:Bob as a producer of Test-topic we can execute the following
command:
-
- $ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add
--allow-principal User:Bob --producer --topic Test-topic
+
+```bash
+$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal
User:Bob --producer --topic Test-topic
+```
Similarly to add Alice as a consumer of Test-topic with consumer group Group-1
we just have to pass --consumer option:
-
- $ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add
--allow-principal User:Bob --consumer --topic Test-topic --group Group-1
+
+```bash
+$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal
User:Bob --consumer --topic Test-topic --group Group-1
+```
Note that for consumer option we must also specify the consumer group. In
order to remove a principal from producer or consumer role we just need to pass
--remove option.
diff --git a/docs/security/encryption-and-authentication-using-ssl.md
b/docs/security/encryption-and-authentication-using-ssl.md
index 11dd9fad03f..7b903f52771 100644
--- a/docs/security/encryption-and-authentication-using-ssl.md
+++ b/docs/security/encryption-and-authentication-using-ssl.md
@@ -31,8 +31,10 @@ Apache Kafka allows clients to use SSL for encryption of
traffic as well as auth
### Generate SSL key and certificate for each Kafka broker
The first step of deploying one or more brokers with SSL support is to
generate a public/private keypair for every server. Since Kafka expects all
keys and certificates to be stored in keystores we will use Java's keytool
command for this task. The tool supports two different keystore formats, the
Java specific jks format which has been deprecated by now, as well as PKCS12.
PKCS12 is the default format as of Java version 9, to ensure this format is
being used regardless of the Java versi [...]
-
- $ keytool -keystore {keystorefile} -alias localhost -validity
{validity} -genkey -keyalg RSA -storetype pkcs12
+
+```bash
+$ keytool -keystore {keystorefile} -alias localhost -validity {validity}
-genkey -keyalg RSA -storetype pkcs12
+```
You need to specify two parameters in the above command:
1. keystorefile: the keystore file that stores the keys (and later the
certificate) for this broker. The keystore file contains the private and public
keys of this broker, therefore it needs to be kept safe. Ideally this step is
run on the Kafka broker that the key will be used on, as this key should never
be transmitted/leave the server that it is intended for.
@@ -40,8 +42,10 @@ You need to specify two parameters in the above command:
To obtain a certificate that can be used with the private key that was just
created a certificate signing request needs to be created. This signing
request, when signed by a trusted CA results in the actual certificate which
can then be installed in the keystore and used for authentication purposes.
To generate certificate signing requests run the following command for all
server keystores created so far.
-
- $ keytool -keystore server.keystore.jks -alias localhost -validity
{validity} -genkey -keyalg RSA -destkeystoretype pkcs12 -ext
SAN=DNS:{FQDN},IP:{IPADDRESS1}
+
+```bash
+$ keytool -keystore server.keystore.jks -alias localhost -validity {validity}
-genkey -keyalg RSA -destkeystoretype pkcs12 -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}
+```
This command assumes that you want to add hostname information to the
certificate, if this is not the case, you can omit the extension parameter
`-ext SAN=DNS:{FQDN},IP:{IPADDRESS1}`. Please see below for more information on
this.
@@ -50,10 +54,11 @@ This command assumes that you want to add hostname
information to the certificat
Host name verification, when enabled, is the process of checking attributes
from the certificate that is presented by the server you are connecting to
against the actual hostname or ip address of that server to ensure that you are
indeed connecting to the correct server.
The main reason for this check is to prevent man-in-the-middle attacks. For
Kafka, this check has been disabled by default for a long time, but as of Kafka
2.0.0 host name verification of servers is enabled by default for client
connections as well as inter-broker connections.
Server host name verification may be disabled by setting
`ssl.endpoint.identification.algorithm` to an empty string.
-For dynamically configured broker listeners, hostname verification may be
disabled using `kafka-configs.sh`:
+For dynamically configured broker listeners, hostname verification may be
disabled using `kafka-configs.sh`:
-
- $ bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type
brokers --entity-name 0 --alter --add-config
"listener.name.internal.ssl.endpoint.identification.algorithm="
+```bash
+$ bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers
--entity-name 0 --alter --add-config
"listener.name.internal.ssl.endpoint.identification.algorithm="
+```
**Note:**
@@ -67,8 +72,10 @@ If host name verification is enabled, clients will verify
the server's fully qua
While Kafka checks both fields, usage of the common name field for hostname
verification has been
[deprecated](https://tools.ietf.org/html/rfc2818#section-3.1) since 2000 and
should be avoided if possible. In addition the SAN field is much more flexible,
allowing for multiple DNS and IP entries to be declared in a certificate.
Another advantage is that if the SAN field is used for hostname verification
the common name can be set to a more meaningful value for authorization
purposes. Since we need the SAN field to be contained in the signed
certificate, it will be specified when generating the signing request. It can
also be specified when generating the keypair, but this will not automatically
be copied into the signing request.
To add a SAN field append the following argument ` -ext
SAN=DNS:{FQDN},IP:{IPADDRESS}` to the keytool command:
-
- $ keytool -keystore server.keystore.jks -alias localhost -validity
{validity} -genkey -keyalg RSA -destkeystoretype pkcs12 -ext
SAN=DNS:{FQDN},IP:{IPADDRESS1}
+
+```bash
+$ keytool -keystore server.keystore.jks -alias localhost -validity {validity}
-genkey -keyalg RSA -destkeystoretype pkcs12 -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}
+```
### Creating your own CA
@@ -80,119 +87,133 @@ For this guide we will be our own Certificate Authority.
When setting up a produ
Due to a [bug](https://www.openssl.org/docs/man1.1.1/man1/x509.html#BUGS) in
OpenSSL, the x509 module will not copy requested extension fields from CSRs
into the final certificate. Since we want the SAN extension to be present in
our certificate to enable hostname verification, we'll use the _ca_ module
instead. This requires some additional configuration to be in place before we
generate our CA keypair.
Save the following listing into a file called openssl-ca.cnf and adjust the
values for validity and common attributes as necessary.
-
- HOME = .
- RANDFILE = $ENV::HOME/.rnd
-
- ####################################################################
- [ ca ]
- default_ca = CA_default # The default ca section
-
- [ CA_default ]
-
- base_dir = .
- certificate = $base_dir/cacert.pem # The CA certificate
- private_key = $base_dir/cakey.pem # The CA private key
- new_certs_dir = $base_dir # Location for new certs after
signing
- database = $base_dir/index.txt # Database index file
- serial = $base_dir/serial.txt # The current serial number
-
- default_days = 1000 # How long to certify for
- default_crl_days = 30 # How long before next CRL
- default_md = sha256 # Use public key default MD
- preserve = no # Keep passed DN ordering
-
- x509_extensions = ca_extensions # The extensions to add to the cert
-
- email_in_dn = no # Don't concat the email in the DN
- copy_extensions = copy # Required to copy SANs from CSR to
cert
-
- ####################################################################
- [ req ]
- default_bits = 4096
- default_keyfile = cakey.pem
- distinguished_name = ca_distinguished_name
- x509_extensions = ca_extensions
- string_mask = utf8only
-
- ####################################################################
- [ ca_distinguished_name ]
- countryName = Country Name (2 letter code)
- countryName_default = DE
-
- stateOrProvinceName = State or Province Name (full name)
- stateOrProvinceName_default = Test Province
-
- localityName = Locality Name (eg, city)
- localityName_default = Test Town
-
- organizationName = Organization Name (eg, company)
- organizationName_default = Test Company
-
- organizationalUnitName = Organizational Unit (eg, division)
- organizationalUnitName_default = Test Unit
-
- commonName = Common Name (e.g. server FQDN or YOUR name)
- commonName_default = Test Name
-
- emailAddress = Email Address
- emailAddress_default = [email protected]
-
- ####################################################################
- [ ca_extensions ]
-
- subjectKeyIdentifier = hash
- authorityKeyIdentifier = keyid:always, issuer
- basicConstraints = critical, CA:true
- keyUsage = keyCertSign, cRLSign
-
- ####################################################################
- [ signing_policy ]
- countryName = optional
- stateOrProvinceName = optional
- localityName = optional
- organizationName = optional
- organizationalUnitName = optional
- commonName = supplied
- emailAddress = optional
-
- ####################################################################
- [ signing_req ]
- subjectKeyIdentifier = hash
- authorityKeyIdentifier = keyid,issuer
- basicConstraints = CA:FALSE
- keyUsage = digitalSignature, keyEncipherment
+
+```ini
+HOME = .
+RANDFILE = $ENV::HOME/.rnd
+
+####################################################################
+[ ca ]
+default_ca = CA_default # The default ca section
+
+[ CA_default ]
+
+base_dir = .
+certificate = $base_dir/cacert.pem # The CA certificate
+private_key = $base_dir/cakey.pem # The CA private key
+new_certs_dir = $base_dir # Location for new certs after signing
+database = $base_dir/index.txt # Database index file
+serial = $base_dir/serial.txt # The current serial number
+
+default_days = 1000 # How long to certify for
+default_crl_days = 30 # How long before next CRL
+default_md = sha256 # Use public key default MD
+preserve = no # Keep passed DN ordering
+
+x509_extensions = ca_extensions # The extensions to add to the cert
+
+email_in_dn = no # Don't concat the email in the DN
+copy_extensions = copy # Required to copy SANs from CSR to cert
+
+####################################################################
+[ req ]
+default_bits = 4096
+default_keyfile = cakey.pem
+distinguished_name = ca_distinguished_name
+x509_extensions = ca_extensions
+string_mask = utf8only
+
+####################################################################
+[ ca_distinguished_name ]
+countryName = Country Name (2 letter code)
+countryName_default = DE
+
+stateOrProvinceName = State or Province Name (full name)
+stateOrProvinceName_default = Test Province
+
+localityName = Locality Name (eg, city)
+localityName_default = Test Town
+
+organizationName = Organization Name (eg, company)
+organizationName_default = Test Company
+
+organizationalUnitName = Organizational Unit (eg, division)
+organizationalUnitName_default = Test Unit
+
+commonName = Common Name (e.g. server FQDN or YOUR name)
+commonName_default = Test Name
+
+emailAddress = Email Address
+emailAddress_default = [email protected]
+
+####################################################################
+[ ca_extensions ]
+
+subjectKeyIdentifier = hash
+authorityKeyIdentifier = keyid:always, issuer
+basicConstraints = critical, CA:true
+keyUsage = keyCertSign, cRLSign
+
+####################################################################
+[ signing_policy ]
+countryName = optional
+stateOrProvinceName = optional
+localityName = optional
+organizationName = optional
+organizationalUnitName = optional
+commonName = supplied
+emailAddress = optional
+
+####################################################################
+[ signing_req ]
+subjectKeyIdentifier = hash
+authorityKeyIdentifier = keyid,issuer
+basicConstraints = CA:FALSE
+keyUsage = digitalSignature, keyEncipherment
+```
Then create a database and serial number file, these will be used to keep
track of which certificates were signed with this CA. Both of these are simply
text files that reside in the same directory as your CA keys.
-
- $ echo 01 > serial.txt
- $ touch index.txt
+
+```bash
+$ echo 01 > serial.txt
+$ touch index.txt
+```
With these steps done you are now ready to generate your CA that will be used
to sign certificates later.
-
- $ openssl req -x509 -config openssl-ca.cnf -newkey rsa:4096 -sha256
-nodes -out cacert.pem -outform PEM
+
+```bash
+$ openssl req -x509 -config openssl-ca.cnf -newkey rsa:4096 -sha256 -nodes
-out cacert.pem -outform PEM
+```
The CA is simply a public/private key pair and certificate that is signed by
itself, and is only intended to sign other certificates.
This keypair should be kept very safe, if someone gains access to it, they can
create and sign certificates that will be trusted by your infrastructure, which
means they will be able to impersonate anybody when connecting to any service
that trusts this CA.
The next step is to add the generated CA to the **clients' truststore** so
that the clients can trust this CA:
-
- $ keytool -keystore client.truststore.jks -alias CARoot -import -file
ca-cert
+
+```bash
+$ keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
+```
**Note:** If you configure the Kafka brokers to require client authentication
by setting ssl.client.auth to be "requested" or "required" in the Kafka brokers
config then you must provide a truststore for the Kafka brokers as well and it
should have all the CA certificates that clients' keys were signed by.
-
- $ keytool -keystore server.truststore.jks -alias CARoot -import -file
ca-cert
+
+```bash
+$ keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
+```
In contrast to the keystore in step 1 that stores each machine's own identity,
the truststore of a client stores all the certificates that the client should
trust. Importing a certificate into one's truststore also means trusting all
certificates that are signed by that certificate. As the analogy above,
trusting the government (CA) also means trusting all passports (certificates)
that it has issued. This attribute is called the chain of trust, and it is
particularly useful when deployin [...]
### Signing the certificate
Then sign it with the CA:
-
- $ openssl ca -config openssl-ca.cnf -policy signing_policy
-extensions signing_req -out {server certificate} -infiles {certificate signing
request}
+
+```bash
+$ openssl ca -config openssl-ca.cnf -policy signing_policy -extensions
signing_req -out {server certificate} -infiles {certificate signing request}
+```
Finally, you need to import both the certificate of the CA and the signed
certificate into the keystore:
-
- $ keytool -keystore {keystore} -alias CARoot -import -file {CA
certificate}
- $ keytool -keystore {keystore} -alias localhost -import -file
cert-signed
+
+```bash
+$ keytool -keystore {keystore} -alias CARoot -import -file {CA certificate}
+$ keytool -keystore {keystore} -alias localhost -import -file cert-signed
+```
The definitions of the parameters are the following:
1. keystore: the location of the keystore
@@ -223,22 +244,28 @@ Kafka brokers need both these usages to be allowed, as
for intra-cluster communi
Corporate Root CAs are often kept offline for security reasons. To enable
day-to-day usage, so called intermediate CAs are created, which are then used
to sign the final certificates. When importing a certificate into the keystore
that was signed by an intermediate CA it is necessary to provide the entire
chain of trust up to the root CA. This can be done by simply _cat_ ing the
certificate files into one combined certificate file and then importing this
with keytool.
3. **Failure to copy extension fields**
CA operators are often hesitant to copy and requested extension fields from
CSRs and prefer to specify these themselves as this makes it harder for a
malicious party to obtain certificates with potentially misleading or
fraudulent values. It is advisable to double check signed certificates, whether
these contain all requested SAN fields to enable proper hostname verification.
The following command can be used to print certificate details to the console,
which should be compared with what [...]
-
- $ openssl x509 -in certificate.crt -text -noout
+
+```bash
+$ openssl x509 -in certificate.crt -text -noout
+```
### Configuring Kafka Brokers
If SSL is not enabled for inter-broker communication (see below for how to
enable it), both PLAINTEXT and SSL ports will be necessary.
-
- listeners=PLAINTEXT://host.name:port,SSL://host.name:port
-Following SSL configs are needed on the broker side
-
- ssl.keystore.location=/var/private/ssl/server.keystore.jks
- ssl.keystore.password=test1234
- ssl.key.password=test1234
- ssl.truststore.location=/var/private/ssl/server.truststore.jks
- ssl.truststore.password=test1234
+```java-properties
+listeners=PLAINTEXT://host.name:port,SSL://host.name:port
+```
+
+Following SSL configs are needed on the broker side:
+
+```java-properties
+ssl.keystore.location=/var/private/ssl/server.keystore.jks
+ssl.keystore.password=test1234
+ssl.key.password=test1234
+ssl.truststore.location=/var/private/ssl/server.truststore.jks
+ssl.truststore.password=test1234
+```
Note: ssl.truststore.password is technically optional but highly recommended.
If a password is not set access to the truststore is still available, but
integrity checking is disabled. Optional settings that are worth considering:
1. ssl.client.auth=none ("required" => client authentication is required,
"requested" => client authentication is requested and client without certs can
still connect. The usage of "requested" is discouraged as it provides a false
sense of security and misconfigured clients will still connect successfully.)
@@ -247,46 +274,58 @@ Note: ssl.truststore.password is technically optional but
highly recommended. If
4. ssl.keystore.type=JKS
5. ssl.truststore.type=JKS
6. ssl.secure.random.implementation=SHA1PRNG
-If you want to enable SSL for inter-broker communication, add the following to
the server.properties file (it defaults to PLAINTEXT)
-
- security.inter.broker.protocol=SSL
+If you want to enable SSL for inter-broker communication, add the following to
the server.properties file (it defaults to PLAINTEXT):
+
+```java-properties
+security.inter.broker.protocol=SSL
+```
Due to import regulations in some countries, the Oracle implementation limits
the strength of cryptographic algorithms available by default. If stronger
algorithms are needed (for example, AES with 256-bit keys), the [JCE Unlimited
Strength Jurisdiction Policy
Files](https://www.oracle.com/technetwork/java/javase/downloads/index.html)
must be obtained and installed in the JDK/JRE. See the [JCA Providers
Documentation](https://docs.oracle.com/javase/8/docs/technotes/guides/security/SunPro
[...]
The JRE/JDK will have a default pseudo-random number generator (PRNG) that is
used for cryptography operations, so it is not required to configure the
implementation used with the `ssl.secure.random.implementation`. However, there
are performance issues with some implementations (notably, the default chosen
on Linux systems, `NativePRNG`, utilizes a global lock). In cases where
performance of SSL connections becomes an issue, consider explicitly setting
the implementation to be used. The [...]
-Once you start the broker you should be able to see in the server.log
-
- with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL ->
EndPoint(192.168.64.1,9093,SSL)
+Once you start the broker you should be able to see in the server.log:
+
+```text
+with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL ->
EndPoint(192.168.64.1,9093,SSL)
+```
-To check quickly if the server keystore and truststore are setup properly you
can run the following command
-
- $ openssl s_client -debug -connect localhost:9093 -tls1
+To check quickly if the server keystore and truststore are setup properly you
can run the following command:
+
+```bash
+$ openssl s_client -debug -connect localhost:9093 -tls1
+```
(Note: TLSv1 should be listed under ssl.enabled.protocols)
In the output of this command you should see server's certificate:
-
- -----BEGIN CERTIFICATE-----
- {variable sized random bytes}
- -----END CERTIFICATE-----
- subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani
- issuer=/C=US/ST=CA/L=Santa
Clara/O=org/OU=org/CN=kafka/[email protected]
+
+```text
+-----BEGIN CERTIFICATE-----
+{variable sized random bytes}
+-----END CERTIFICATE-----
+subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani
+issuer=/C=US/ST=CA/L=Santa
Clara/O=org/OU=org/CN=kafka/[email protected]
+```
If the certificate does not show up or if there are any other error messages
then your keystore is not setup properly.
### Configuring Kafka Clients
SSL is supported only for the new Kafka Producer and Consumer, the older API
is not supported. The configs for SSL will be the same for both producer and
consumer.
If client authentication is not required in the broker, then the following is
a minimal configuration example:
-
- security.protocol=SSL
- ssl.truststore.location=/var/private/ssl/client.truststore.jks
- ssl.truststore.password=test1234
+
+```java-properties
+security.protocol=SSL
+ssl.truststore.location=/var/private/ssl/client.truststore.jks
+ssl.truststore.password=test1234
+```
Note: ssl.truststore.password is technically optional but highly recommended.
If a password is not set access to the truststore is still available, but
integrity checking is disabled. If client authentication is required, then a
keystore must be created like in step 1 and the following must also be
configured:
-
- ssl.keystore.location=/var/private/ssl/client.keystore.jks
- ssl.keystore.password=test1234
- ssl.key.password=test1234
+
+```java-properties
+ssl.keystore.location=/var/private/ssl/client.keystore.jks
+ssl.keystore.password=test1234
+ssl.key.password=test1234
+```
Other configuration settings that may also be needed depending on our
requirements and the broker configuration:
1. ssl.provider (Optional). The name of the security provider used for SSL
connections. Default value is the default security provider of the JVM.
@@ -296,9 +335,11 @@ Other configuration settings that may also be needed
depending on our requiremen
5. ssl.keystore.type=JKS
Examples using console-producer and console-consumer:
-
- $ bin/kafka-console-producer.sh --bootstrap-server localhost:9093 --topic
test --command-config client-ssl.properties
- $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic
test --command-config client-ssl.properties
+
+```bash
+$ bin/kafka-console-producer.sh --bootstrap-server localhost:9093 --topic test
--command-config client-ssl.properties
+$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test
--command-config client-ssl.properties
+```
diff --git
a/docs/security/incorporating-security-features-in-a-running-cluster.md
b/docs/security/incorporating-security-features-in-a-running-cluster.md
index 6065b47cb16..9a27f2069a0 100644
--- a/docs/security/incorporating-security-features-in-a-running-cluster.md
+++ b/docs/security/incorporating-security-features-in-a-running-cluster.md
@@ -42,49 +42,57 @@ The security implementation lets you configure different
protocols for both brok
When performing an incremental bounce stop the brokers cleanly via a SIGTERM.
It's also good practice to wait for restarted replicas to return to the ISR
list before moving onto the next node.
As an example, say we wish to encrypt both broker-client and broker-broker
communication with SSL. In the first incremental bounce, an SSL port is opened
on each node:
-
-
- listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
+
+```java-properties
+listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
+```
We then restart the clients, changing their config to point at the newly
opened, secured port:
-
-
- bootstrap.servers = [broker1:9092,...]
- security.protocol = SSL
- ...etc
+
+```java-properties
+bootstrap.servers = [broker1:9092,...]
+security.protocol = SSL
+...etc
+```
In the second incremental server bounce we instruct Kafka to use SSL as the
broker-broker protocol (which will use the same SSL port):
-
-
- listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
- security.inter.broker.protocol=SSL
+
+```java-properties
+listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
+security.inter.broker.protocol=SSL
+```
In the final bounce we secure the cluster by closing the PLAINTEXT port:
-
-
- listeners=SSL://broker1:9092
- security.inter.broker.protocol=SSL
+
+```java-properties
+listeners=SSL://broker1:9092
+security.inter.broker.protocol=SSL
+```
Alternatively we might choose to open multiple ports so that different
protocols can be used for broker-broker and broker-client communication. Say we
wished to use SSL encryption throughout (i.e. for broker-broker and
broker-client communication) but we'd like to add SASL authentication to the
broker-client connection also. We would achieve this by opening two additional
ports during the first bounce:
-
-
-
listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
+
+```java-properties
+listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
+```
We would then restart the clients, changing their config to point at the newly
opened, SASL & SSL secured port:
-
-
- bootstrap.servers = [broker1:9093,...]
- security.protocol = SASL_SSL
- ...etc
+
+```java-properties
+bootstrap.servers = [broker1:9093,...]
+security.protocol = SASL_SSL
+...etc
+```
The second server bounce would switch the cluster to use encrypted
broker-broker communication via the SSL port we previously opened on port 9092:
-
-
-
listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
- security.inter.broker.protocol=SSL
+
+```java-properties
+listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
+security.inter.broker.protocol=SSL
+```
The final bounce secures the cluster by closing the PLAINTEXT port.
-
-
- listeners=SSL://broker1:9092,SASL_SSL://broker1:9093
- security.inter.broker.protocol=SSL
+
+```java-properties
+listeners=SSL://broker1:9092,SASL_SSL://broker1:9093
+security.inter.broker.protocol=SSL
+```
diff --git a/docs/security/listener-configuration.md
b/docs/security/listener-configuration.md
index 5d31d670dc2..303d9237d53 100644
--- a/docs/security/listener-configuration.md
+++ b/docs/security/listener-configuration.md
@@ -29,19 +29,22 @@ type: docs
In order to secure a Kafka cluster, it is necessary to secure the channels
that are used to communicate with the servers. Each server must define the set
of listeners that are used to receive requests from clients as well as other
servers. Each listener may be configured to authenticate clients using various
mechanisms and to ensure traffic between the server and the client is
encrypted. This section provides a primer for the configuration of listeners.
Kafka servers support listening for connections on multiple ports. This is
configured through the `listeners` property in the server configuration, which
accepts a comma-separated list of the listeners to enable. At least one
listener must be defined on each server. The format of each listener defined in
`listeners` is given below:
-
-
- {LISTENER_NAME}://{hostname}:{port}
+
+```text
+{LISTENER_NAME}://{hostname}:{port}
+```
The `LISTENER_NAME` is usually a descriptive name which defines the purpose of
the listener. For example, many configurations use a separate listener for
client traffic, so they might refer to the corresponding listener as `CLIENT`
in the configuration:
-
-
- listeners=CLIENT://localhost:9092
+
+```java-properties
+listeners=CLIENT://localhost:9092
+```
The security protocol of each listener is defined in a separate configuration:
`listener.security.protocol.map`. The value is a comma-separated list of each
listener mapped to its security protocol. For example, the follow value
configuration specifies that the `CLIENT` listener will use SSL while the
`BROKER` listener will use plaintext.
-
-
- listener.security.protocol.map=CLIENT:SSL,BROKER:PLAINTEXT
+
+```java-properties
+listener.security.protocol.map=CLIENT:SSL,BROKER:PLAINTEXT
+```
Possible options (case-insensitive) for the security protocol are given below:
@@ -55,9 +58,10 @@ Possible options (case-insensitive) for the security
protocol are given below:
The plaintext protocol provides no security and does not require any
additional configuration. In the following sections, this document covers how
to configure the remaining protocols.
If each required listener uses a separate security protocol, it is also
possible to use the security protocol name as the listener name in `listeners`.
Using the example above, we could skip the definition of the `CLIENT` and
`BROKER` listeners using the following definition:
-
-
- listeners=SSL://localhost:9092,PLAINTEXT://localhost:9093
+
+```java-properties
+listeners=SSL://localhost:9092,PLAINTEXT://localhost:9093
+```
However, we recommend users to provide explicit names for the listeners since
it makes the intended usage of each listener clearer.
@@ -66,26 +70,28 @@ Among the listeners in this list, it is possible to declare
the listener to be u
In a KRaft cluster, a broker is any server which has the `broker` role enabled
in `process.roles` and a controller is any server which has the `controller`
role enabled. Listener configuration depends on the role. The listener defined
by `inter.broker.listener.name` is used exclusively for requests between
brokers. Controllers, on the other hand, must use separate listener which is
defined by the `controller.listener.names` configuration. This cannot be set to
the same value as the inter [...]
Controllers receive requests both from other controllers and from brokers. For
this reason, even if a server does not have the `controller` role enabled (i.e.
it is just a broker), it must still define the controller listener along with
any security properties that are needed to configure it. For example, we might
use the following configuration on a standalone broker:
-
-
- process.roles=broker
- listeners=BROKER://localhost:9092
- inter.broker.listener.name=BROKER
- controller.quorum.bootstrap.servers=localhost:9093
- controller.listener.names=CONTROLLER
- listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL
+
+```java-properties
+process.roles=broker
+listeners=BROKER://localhost:9092
+inter.broker.listener.name=BROKER
+controller.quorum.bootstrap.servers=localhost:9093
+controller.listener.names=CONTROLLER
+listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL
+```
The controller listener is still configured in this example to use the
`SASL_SSL` security protocol, but it is not included in `listeners` since the
broker does not expose the controller listener itself. The port that will be
used in this case comes from the `controller.quorum.voters` configuration,
which defines the complete list of controllers.
For KRaft servers which have both the broker and controller role enabled, the
configuration is similar. The only difference is that the controller listener
must be included in `listeners`:
-
-
- process.roles=broker,controller
- listeners=BROKER://localhost:9092,CONTROLLER://localhost:9093
- inter.broker.listener.name=BROKER
- controller.quorum.bootstrap.servers=localhost:9093
- controller.listener.names=CONTROLLER
- listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL
+
+```java-properties
+process.roles=broker,controller
+listeners=BROKER://localhost:9092,CONTROLLER://localhost:9093
+inter.broker.listener.name=BROKER
+controller.quorum.bootstrap.servers=localhost:9093
+controller.listener.names=CONTROLLER
+listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL
+```
It is a requirement that the host and port defined in
`controller.quorum.bootstrap.servers` is routed to the exposed controller
listeners. For example, here the `CONTROLLER` listener is bound to
localhost:9093. The connection string defined by
`controller.quorum.bootstrap.servers` must then also use localhost:9093, as it
does here.