This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8a1c19804b2 KAFKA-20366: Add syntax highlighting in security docs 
(#21965)
8a1c19804b2 is described below

commit 8a1c19804b2735a05ddb9086cd28882b746b9ba4
Author: gomudayya <[email protected]>
AuthorDate: Thu Apr 30 00:01:55 2026 +0900

    KAFKA-20366: Add syntax highlighting in security docs (#21965)
    
    
    Reviewers: Mickael Maison <[email protected]>
---
 docs/security/authentication-using-sasl.md         | 502 ++++++++++++---------
 docs/security/authorization-and-acls.md            | 140 +++---
 .../encryption-and-authentication-using-ssl.md     | 325 +++++++------
 ...ating-security-features-in-a-running-cluster.md |  72 +--
 docs/security/listener-configuration.md            |  62 +--
 5 files changed, 642 insertions(+), 459 deletions(-)

diff --git a/docs/security/authentication-using-sasl.md 
b/docs/security/authentication-using-sasl.md
index f4a049a24a6..2698e4f7a21 100644
--- a/docs/security/authentication-using-sasl.md
+++ b/docs/security/authentication-using-sasl.md
@@ -35,15 +35,17 @@ Kafka uses the Java Authentication and Authorization 
Service ([JAAS](https://doc
 `KafkaServer` is the section name in the JAAS file used by each 
KafkaServer/Broker. This section provides SASL configuration options for the 
broker including any SASL client connections made by the broker for 
inter-broker communication. If multiple listeners are configured to use SASL, 
the section name may be prefixed with the listener name in lower-case followed 
by a period, e.g. `sasl_ssl.KafkaServer`.
 
 Brokers may also configure JAAS using the broker configuration property 
`sasl.jaas.config`. The property name must be prefixed with the listener prefix 
including the SASL mechanism, i.e. 
`listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config`. Only one login 
module may be specified in the config value. If multiple mechanisms are 
configured on a listener, configs must be provided for each mechanism using the 
listener and mechanism prefix. For example,
-            
-            
listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule
 required \
-                username="admin" \
-                password="admin-secret";
-            
listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
 required \
-                username="admin" \
-                password="admin-secret" \
-                user_admin="admin-secret" \
-                user_alice="alice-secret";
+
+```java-properties
+listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule
 required \
+    username="admin" \
+    password="admin-secret";
+listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
 required \
+    username="admin" \
+    password="admin-secret" \
+    user_admin="admin-secret" \
+    user_alice="alice-secret";
+```
 
 If JAAS configuration is defined at different levels, the order of precedence 
used is:
 
@@ -69,7 +71,7 @@ To configure SASL authentication on the clients using static 
JAAS config file:
 
 1. Add a JAAS config file with a client login section named `KafkaClient`. 
Configure a login module in `KafkaClient` for the selected mechanism as 
described in the examples for setting up GSSAPI (Kerberos), PLAIN, SCRAM, or 
non-production/production OAUTHBEARER. For example, GSSAPI credentials may be 
configured as:
 
-   ```
+   ```text
    KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
@@ -81,7 +83,7 @@ To configure SASL authentication on the clients using static 
JAAS config file:
 
 2. Pass the JAAS config file location as JVM parameter to each client JVM. For 
example:
 
-   ```
+   ```text
    -Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
    ```
 
@@ -102,13 +104,13 @@ Kafka supports the following SASL mechanisms:
 
 1. Configure a SASL port in server.properties, by adding at least one of 
SASL_PLAINTEXT or SASL_SSL to the _listeners_ parameter, which contains one or 
more comma-separated values:
 
-   ```
+   ```java-properties
    listeners=SASL_PLAINTEXT://host.name:port
    ```
 
    If you are only configuring a SASL port (or if you want the Kafka brokers 
to authenticate each other using SASL) then make sure you set the same SASL 
protocol for inter-broker communication:
 
-   ```
+   ```java-properties
    security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
    ```
 
@@ -133,7 +135,7 @@ Note: When establishing connections to brokers via SASL, 
clients may perform a r
 
    If you have installed your own Kerberos, you will need to create these 
principals yourself using the following commands:
 
-   ```
+   ```bash
    $ sudo /usr/sbin/kadmin.local -q 'addprinc -randkey 
kafka/{hostname}@{REALM}'
    $ sudo /usr/sbin/kadmin.local -q "ktadd -k 
/etc/security/keytabs/{keytabname}.keytab kafka/{hostname}@{REALM}"
    ```
@@ -143,7 +145,7 @@ Note: When establishing connections to brokers via SASL, 
clients may perform a r
 
 1. Add a suitably modified JAAS file similar to the one below to each Kafka 
broker's config directory, let's call it kafka_server_jaas.conf for this 
example (note that each broker should have its own keytab):
 
-   ```
+   ```text
    KafkaServer {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
@@ -157,7 +159,7 @@ Note: When establishing connections to brokers via SASL, 
clients may perform a r
 
 2. Pass the JAAS and optionally the krb5 file locations as JVM parameters to 
each Kafka broker (see 
[here](https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html)
 for more details):
 
-   ```
+   ```text
    -Djava.security.krb5.conf=/etc/kafka/krb5.conf
    -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
    ```
@@ -166,7 +168,7 @@ Note: When establishing connections to brokers via SASL, 
clients may perform a r
 
 4. Configure SASL port and SASL mechanisms in server.properties as described 
here. For example:
 
-   ```
+   ```java-properties
    listeners=SASL_PLAINTEXT://host.name:port
    security.inter.broker.protocol=SASL_PLAINTEXT
    sasl.mechanism.inter.broker.protocol=GSSAPI
@@ -175,7 +177,7 @@ Note: When establishing connections to brokers via SASL, 
clients may perform a r
 
    We must also configure the service name in server.properties, which should 
match the principal name of the kafka brokers. In the above example, principal 
is "kafka/[email protected]", so:
 
-   ```
+   ```java-properties
    sasl.kerberos.service.name=kafka
    ```
 
@@ -183,29 +185,37 @@ Note: When establishing connections to brokers via SASL, 
clients may perform a r
 
 To configure SASL authentication on the clients: 
 1. Clients (producers, consumers, connect workers, etc) will authenticate to 
the cluster with their own principal (usually with the same name as the user 
running the client), so obtain or create these principals as needed. Then 
configure the JAAS configuration property for each client. Different clients 
within a JVM may run as different users by specifying different principals. The 
property `sasl.jaas.config` in producer.properties or consumer.properties 
describes how clients like produc [...]
-               
-               sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule 
required \
-                   useKeyTab=true \
-                   storeKey=true  \
-                   keyTab="/etc/security/keytabs/kafka_client.keytab" \
-                   principal="[email protected]";
-
-For command-line utilities like kafka-console-consumer or 
kafka-console-producer, kinit can be used along with "useTicketCache=true" as 
in: 
-               
-               sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule 
required \
-                   useTicketCache=true;
+
+   ```java-properties
+   sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
+       useKeyTab=true \
+       storeKey=true \
+       keyTab="/etc/security/keytabs/kafka_client.keytab" \
+       principal="[email protected]";
+   ```
+
+   For command-line utilities like kafka-console-consumer or 
kafka-console-producer, kinit can be used along with "useTicketCache=true" as 
in: 
+
+   ```java-properties
+   sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
+       useTicketCache=true;
+   ```
 
 JAAS configuration for clients may alternatively be specified as a JVM 
parameter similar to brokers as described here. Clients use the login section 
named `KafkaClient`. This option allows only one user for all client 
connections from a JVM.
 2. Make sure the keytabs configured in the JAAS configuration are readable by 
the operating system user who is starting kafka client.
 3. Optionally pass the krb5 file locations as JVM parameters to each client 
JVM (see 
[here](https://docs.oracle.com/javase/8/docs/technotes/guides/security/jgss/tutorials/KerberosReq.html)
 for more details): 
-               
-               -Djava.security.krb5.conf=/etc/kafka/krb5.conf
+
+   ```text
+   -Djava.security.krb5.conf=/etc/kafka/krb5.conf
+   ```
 
 4. Configure the following properties in producer.properties or 
consumer.properties: 
-               
-               security.protocol=SASL_PLAINTEXT (or SASL_SSL)
-               sasl.mechanism=GSSAPI
-               sasl.kerberos.service.name=kafka
+
+   ```java-properties
+   security.protocol=SASL_PLAINTEXT (or SASL_SSL)
+   sasl.mechanism=GSSAPI
+   sasl.kerberos.service.name=kafka
+   ```
 
 ### Authentication using SASL/PLAIN
 
@@ -215,44 +225,54 @@ Under the default implementation of 
`principal.builder.class`, the username is u
 #### Configuring Kafka Brokers
 
 1. Add a suitably modified JAAS file similar to the one below to each Kafka 
broker's config directory, let's call it kafka_server_jaas.conf for this 
example: 
-               
-               KafkaServer {
-                   org.apache.kafka.common.security.plain.PlainLoginModule 
required
-                   username="admin"
-                   password="admin-secret"
-                   user_admin="admin-secret"
-                   user_alice="alice-secret";
-               };
-
-This configuration defines two users (_admin_ and _alice_). The properties 
`username` and `password` in the `KafkaServer` section are used by the broker 
to initiate connections to other brokers. In this example, _admin_ is the user 
for inter-broker communication. The set of properties `user__userName_` defines 
the passwords for all users that connect to the broker and the broker validates 
all client connections including those from other brokers using these 
properties.
+
+   ```text
+   KafkaServer {
+       org.apache.kafka.common.security.plain.PlainLoginModule required
+       username="admin"
+       password="admin-secret"
+       user_admin="admin-secret"
+       user_alice="alice-secret";
+   };
+   ```
+
+   This configuration defines two users (_admin_ and _alice_). The properties 
`username` and `password` in the `KafkaServer` section are used by the broker 
to initiate connections to other brokers. In this example, _admin_ is the user 
for inter-broker communication. The set of properties `user__userName_` defines 
the passwords for all users that connect to the broker and the broker validates 
all client connections including those from other brokers using these 
properties.
 2. Pass the JAAS config file location as JVM parameter to each Kafka broker: 
-               
-               
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
+
+   ```text
+   -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
+   ```
 
 3. Configure SASL port and SASL mechanisms in server.properties as described 
here. For example: 
-               
-               listeners=SASL_SSL://host.name:port
-               security.inter.broker.protocol=SASL_SSL
-               sasl.mechanism.inter.broker.protocol=PLAIN
-               sasl.enabled.mechanisms=PLAIN
+
+   ```java-properties
+   listeners=SASL_SSL://host.name:port
+   security.inter.broker.protocol=SASL_SSL
+   sasl.mechanism.inter.broker.protocol=PLAIN
+   sasl.enabled.mechanisms=PLAIN
+   ```
 
 #### Configuring Kafka Clients
 
 To configure SASL authentication on the clients: 
 1. Configure the JAAS configuration property for each client in 
producer.properties or consumer.properties. The login module describes how the 
clients like producer and consumer can connect to the Kafka Broker. The 
following is an example configuration for a client for the PLAIN mechanism: 
-               
-               
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required \
-                   username="alice" \
-                   password="alice-secret";
 
-The options `username` and `password` are used by clients to configure the 
user for client connections. In this example, clients connect to the broker as 
user _alice_. Different clients within a JVM may connect as different users by 
specifying different user names and passwords in `sasl.jaas.config`.
+   ```java-properties
+   sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required \
+       username="alice" \
+       password="alice-secret";
+   ```
+
+   The options `username` and `password` are used by clients to configure the 
user for client connections. In this example, clients connect to the broker as 
user _alice_. Different clients within a JVM may connect as different users by 
specifying different user names and passwords in `sasl.jaas.config`.
 
 JAAS configuration for clients may alternatively be specified as a JVM 
parameter similar to brokers as described here. Clients use the login section 
named `KafkaClient`. This option allows only one user for all client 
connections from a JVM.
 
 2. Configure the following properties in producer.properties or 
consumer.properties: 
-               
-               security.protocol=SASL_SSL
-               sasl.mechanism=PLAIN
+
+   ```java-properties
+   security.protocol=SASL_SSL
+   sasl.mechanism=PLAIN
+   ```
 
 #### Use of SASL/PLAIN in production
 
@@ -268,62 +288,87 @@ Salted Challenge Response Authentication Mechanism 
(SCRAM) is a family of SASL m
 The SCRAM implementation in Kafka uses the metadata log as credential store. 
Credentials can be created in the metadata log using `kafka-storage.sh` or 
`kafka-configs.sh`. For each SCRAM mechanism enabled, credentials must be 
created by adding a config with the mechanism name. Credentials for 
inter-broker communication must be created before Kafka brokers are started. 
`kafka-storage.sh` can format storage with initial credentials. Client 
credentials may be created and updated dynamically [...]
 
 Create initial SCRAM credentials for user _admin_ with password _admin-secret_ 
: 
-            
-            $ bin/kafka-storage.sh format -t $(bin/kafka-storage.sh 
random-uuid) -c config/server.properties --add-scram 
'SCRAM-SHA-256=[name="admin",password="admin-secret"]'
+
+```bash
+$ bin/kafka-storage.sh format -t $(bin/kafka-storage.sh random-uuid) \
+    -c config/server.properties \
+    --add-scram 'SCRAM-SHA-256=[name="admin",password="admin-secret"]'
+```
 
 Create SCRAM credentials for user _alice_ with password _alice-secret_ (refer 
to Configuring Kafka Clients for client configuration): 
-            
-            $ bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter 
--add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret]' 
--entity-type users --entity-name alice --command-config client.properties
+
+```bash
+$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter \
+    --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret]' \
+    --entity-type users --entity-name alice --command-config client.properties
+```
 
 The default iteration count of 4096 is used if iterations are not specified. A 
random salt is created if it's not specified. The SCRAM identity consisting of 
salt, iterations, StoredKey and ServerKey are stored in the metadata log. See 
[RFC 5802](https://tools.ietf.org/html/rfc5802) for details on SCRAM identity 
and the individual fields. 
 
 Existing credentials may be listed using the _\--describe_ option: 
-            
-            $ bin/kafka-configs.sh --bootstrap-server localhost:9092 
--describe --entity-type users --entity-name alice --command-config 
client.properties
+
+```bash
+$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe \
+    --entity-type users --entity-name alice --command-config client.properties
+```
 
 Credentials may be deleted for one or more SCRAM mechanisms using the 
_\--alter --delete-config_ option: 
-            
-            $ bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter 
--delete-config 'SCRAM-SHA-256' --entity-type users --entity-name alice 
--command-config client.properties
+
+```bash
+$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter \
+    --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name alice \
+    --command-config client.properties
+```
 
 #### Configuring Kafka Brokers
 
 1. Add a suitably modified JAAS file similar to the one below to each Kafka 
broker's config directory, let's call it kafka_server_jaas.conf for this 
example: 
-               
-               KafkaServer {
-                   org.apache.kafka.common.security.scram.ScramLoginModule 
required
-                   username="admin"
-                   password="admin-secret";
-               };
-
-The properties `username` and `password` in the `KafkaServer` section are used 
by the broker to initiate connections to other brokers. In this example, 
_admin_ is the user for inter-broker communication.
+
+   ```text
+   KafkaServer {
+       org.apache.kafka.common.security.scram.ScramLoginModule required
+       username="admin"
+       password="admin-secret";
+   };
+   ```
+
+   The properties `username` and `password` in the `KafkaServer` section are 
used by the broker to initiate connections to other brokers. In this example, 
_admin_ is the user for inter-broker communication.
 2. Pass the JAAS config file location as JVM parameter to each Kafka broker: 
-               
-               
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
+
+   ```text
+   -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
+   ```
 
 3. Configure SASL port and SASL mechanisms in server.properties as described 
here. For example: 
-               
-               listeners=SASL_SSL://host.name:port
-               security.inter.broker.protocol=SASL_SSL
-               sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 (or 
SCRAM-SHA-512)
-               sasl.enabled.mechanisms=SCRAM-SHA-256 (or SCRAM-SHA-512)
+
+   ```java-properties
+   listeners=SASL_SSL://host.name:port
+   security.inter.broker.protocol=SASL_SSL
+   sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 (or SCRAM-SHA-512)
+   sasl.enabled.mechanisms=SCRAM-SHA-256 (or SCRAM-SHA-512)
+   ```
 
 #### Configuring Kafka Clients
 
 To configure SASL authentication on the clients: 
 1. Configure the JAAS configuration property for each client in 
producer.properties or consumer.properties. The login module describes how the 
clients like producer and consumer can connect to the Kafka Broker. The 
following is an example configuration for a client for the SCRAM mechanisms: 
-               
-               
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule 
required \
-                   username="alice" \
-                   password="alice-secret";
 
-The options `username` and `password` are used by clients to configure the 
user for client connections. In this example, clients connect to the broker as 
user _alice_. Different clients within a JVM may connect as different users by 
specifying different user names and passwords in `sasl.jaas.config`.
+   ```java-properties
+   sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule 
required \
+       username="alice" \
+       password="alice-secret";
+   ```
+
+   The options `username` and `password` are used by clients to configure the 
user for client connections. In this example, clients connect to the broker as 
user _alice_. Different clients within a JVM may connect as different users by 
specifying different user names and passwords in `sasl.jaas.config`.
 
 JAAS configuration for clients may alternatively be specified as a JVM 
parameter similar to brokers as described here. Clients use the login section 
named `KafkaClient`. This option allows only one user for all client 
connections from a JVM.
 
 2. Configure the following properties in producer.properties or 
consumer.properties: 
-               
-               security.protocol=SASL_SSL
-               sasl.mechanism=SCRAM-SHA-256 (or SCRAM-SHA-512)
+
+   ```java-properties
+   security.protocol=SASL_SSL
+   sasl.mechanism=SCRAM-SHA-256 (or SCRAM-SHA-512)
+   ```
 
 #### Security Considerations for SASL/SCRAM
 
@@ -342,13 +387,15 @@ Under the default implementation of 
`principal.builder.class`, the principalName
 The default implementation of SASL/OAUTHBEARER in Kafka creates and validates 
[Unsecured JSON Web Tokens](https://tools.ietf.org/html/rfc7515#appendix-A.5). 
While suitable only for non-production use, it does provide the flexibility to 
create arbitrary tokens in a DEV or TEST environment.
 
 1. Add a suitably modified JAAS file similar to the one below to each Kafka 
broker's config directory, let's call it kafka_server_jaas.conf for this 
example: 
-               
-               KafkaServer {
-                   
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
-                   unsecuredLoginStringClaim_sub="admin";
-               };
 
-The property `unsecuredLoginStringClaim_sub` in the `KafkaServer` section is 
used by the broker when it initiates connections to other brokers. In this 
example, _admin_ will appear in the subject (`sub`) claim and will be the user 
for inter-broker communication. 
+   ```text
+   KafkaServer {
+       org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule 
required
+       unsecuredLoginStringClaim_sub="admin";
+   };
+   ```
+
+   The property `unsecuredLoginStringClaim_sub` in the `KafkaServer` section 
is used by the broker when it initiates connections to other brokers. In this 
example, _admin_ will appear in the subject (`sub`) claim and will be the user 
for inter-broker communication. 
 
 Here are the various supported JAAS module options on the broker side for 
[Unsecured JSON Web Token](https://tools.ietf.org/html/rfc7515#appendix-A.5) 
validation:   
 <table>  
@@ -399,36 +446,46 @@ Set to a positive integer value if you wish to allow up 
to some number of positi
 </td> </tr> </table>
 
 2. Pass the JAAS config file location as JVM parameter to each Kafka broker: 
-               
-               
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
+
+   ```text
+   -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
+   ```
 
 3. Configure SASL port and SASL mechanisms in server.properties as described 
here. For example: 
-               
-               listeners=SASL_SSL://host.name:port (or SASL_PLAINTEXT if 
non-production)
-               security.inter.broker.protocol=SASL_SSL (or SASL_PLAINTEXT if 
non-production)
-               sasl.mechanism.inter.broker.protocol=OAUTHBEARER
-               sasl.enabled.mechanisms=OAUTHBEARER
+
+   ```java-properties
+   listeners=SASL_SSL://host.name:port (or SASL_PLAINTEXT if non-production)
+   security.inter.broker.protocol=SASL_SSL (or SASL_PLAINTEXT if 
non-production)
+   sasl.mechanism.inter.broker.protocol=OAUTHBEARER
+   sasl.enabled.mechanisms=OAUTHBEARER
+   ```
 
 #### Configuring Production Kafka Brokers
 
 1. Add a suitably modified JAAS file similar to the one below to each Kafka 
broker's config directory, let's call it kafka_server_jaas.conf for this 
example: 
-               
-               KafkaServer {
-                   
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required ;
-               };
+
+   ```text
+   KafkaServer {
+       org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule 
required ;
+   };
+   ```
 
 2. Pass the JAAS config file location as JVM parameter to each Kafka broker: 
-               
-               
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
+
+   ```text
+   -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
+   ```
 
 3. Configure SASL port and SASL mechanisms in server.properties as described 
here. For example: 
-               
-               listeners=SASL_SSL://host.name:port
-               security.inter.broker.protocol=SASL_SSL
-               sasl.mechanism.inter.broker.protocol=OAUTHBEARER
-               sasl.enabled.mechanisms=OAUTHBEARER
-               listener.name.<listener 
name>.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallbackHandler
-               listener.name.<listener 
name>.oauthbearer.sasl.oauthbearer.jwks.endpoint.url=https://example.com/oauth2/v1/keys
+
+   ```java-properties
+   listeners=SASL_SSL://host.name:port
+   security.inter.broker.protocol=SASL_SSL
+   sasl.mechanism.inter.broker.protocol=OAUTHBEARER
+   sasl.enabled.mechanisms=OAUTHBEARER
+   listener.name.<listener 
name>.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallbackHandler
+   listener.name.<listener 
name>.oauthbearer.sasl.oauthbearer.jwks.endpoint.url=https://example.com/oauth2/v1/keys
+   ```
 
 The OAUTHBEARER broker configuration includes: 
    * sasl.oauthbearer.clock.skew.seconds
@@ -444,11 +501,13 @@ The OAUTHBEARER broker configuration includes:
 
 To configure SASL authentication on the clients: 
 1. Configure the JAAS configuration property for each client in 
producer.properties or consumer.properties. The login module describes how the 
clients like producer and consumer can connect to the Kafka Broker. The 
following is an example configuration for a client for the OAUTHBEARER 
mechanisms: 
-               
-               
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
 required \
-                   unsecuredLoginStringClaim_sub="alice";
 
-The option `unsecuredLoginStringClaim_sub` is used by clients to configure the 
subject (`sub`) claim, which determines the user for client connections. In 
this example, clients connect to the broker as user _alice_. Different clients 
within a JVM may connect as different users by specifying different subject 
(`sub`) claims in `sasl.jaas.config`.
+   ```java-properties
+   
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
 required \
+       unsecuredLoginStringClaim_sub="alice";
+   ```
+
+   The option `unsecuredLoginStringClaim_sub` is used by clients to configure 
the subject (`sub`) claim, which determines the user for client connections. In 
this example, clients connect to the broker as user _alice_. Different clients 
within a JVM may connect as different users by specifying different subject 
(`sub`) claims in `sasl.jaas.config`.
 
 The default implementation of SASL/OAUTHBEARER in Kafka creates and validates 
[Unsecured JSON Web Tokens](https://tools.ietf.org/html/rfc7515#appendix-A.5). 
While suitable only for non-production use, it does provide the flexibility to 
create arbitrary tokens in a DEV or TEST environment.
 
@@ -529,17 +588,21 @@ Set to a custom claim name if you wish the name of the 
`String` or `String List`
 JAAS configuration for clients may alternatively be specified as a JVM 
parameter similar to brokers as described here. Clients use the login section 
named `KafkaClient`. This option allows only one user for all client 
connections from a JVM.
 
 2. Configure the following properties in producer.properties or 
consumer.properties: 
-               
-               security.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
-               sasl.mechanism=OAUTHBEARER
+
+   ```java-properties
+   security.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
+   sasl.mechanism=OAUTHBEARER
+   ```
 
 3. The default implementation of SASL/OAUTHBEARER depends on the 
jackson-databind library. Since it's an optional dependency, users have to 
configure it as a dependency via their build tool.
 #### Configuring Production Kafka Clients
 
 To configure SASL authentication on the clients: 
 1. Configure the JAAS configuration property for each client in 
producer.properties or consumer.properties. The login module describes how the 
clients like producer and consumer can connect to the Kafka Broker. The 
following is an example configuration for a client for the OAUTHBEARER 
mechanisms: 
-               
-               
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
 required ;
+
+   ```java-properties
+   
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
 required ;
+   ```
 
 JAAS configuration for clients may alternatively be specified as a JVM 
parameter similar to brokers as described here. Clients use the login section 
named `KafkaClient`. This option allows only one user for all client 
connections from a JVM.
 
@@ -547,49 +610,57 @@ JAAS configuration for clients may alternatively be 
specified as a JVM parameter
 
 For example, if using the OAuth `client_credentials` grant type with a client 
secret to communicate with the OAuth identity provider, the configuration might 
look like this:
 
-               security.protocol=SASL_SSL
-               sasl.mechanism=OAUTHBEARER
-               sasl.oauthbearer.client.credentials.client.id=jdoe
-               sasl.oauthbearer.client.credentials.client.secret=$3cr3+
-               sasl.oauthbearer.scope=my-application-scope
-               
sasl.oauthbearer.token.endpoint.url=https://example.com/oauth2/v1/token
+   ```java-properties
+   security.protocol=SASL_SSL
+   sasl.mechanism=OAUTHBEARER
+   sasl.oauthbearer.client.credentials.client.id=jdoe
+   sasl.oauthbearer.client.credentials.client.secret=$3cr3+
+   sasl.oauthbearer.scope=my-application-scope
+   sasl.oauthbearer.token.endpoint.url=https://example.com/oauth2/v1/token
+   ```
 
 Alternatively, the `client_credentials` grant type also supports client 
assertion authentication as defined in [RFC 
7523](https://tools.ietf.org/html/rfc7523). Instead of sending a client secret, 
the client authenticates by presenting a signed JWT assertion to the OAuth 
identity provider. This provides enhanced security since private keys never 
leave the client and assertions are short-lived. See 
[KIP-1258](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1258) for 
details.
 
 When using client assertion with dynamically-generated JWTs (recommended), the 
configuration might look like this:
 
-               security.protocol=SASL_SSL
-               sasl.mechanism=OAUTHBEARER
-               
sasl.oauthbearer.token.endpoint.url=https://example.com/oauth2/v1/token
-               
sasl.oauthbearer.assertion.private.key.file=/path/to/private-key.pem
-               sasl.oauthbearer.assertion.algorithm=RS256
-               sasl.oauthbearer.assertion.claim.iss=my-kafka-client
-               sasl.oauthbearer.assertion.claim.sub=my-service-account
-               sasl.oauthbearer.assertion.claim.aud=https://example.com
-               sasl.oauthbearer.assertion.claim.exp.seconds=300
-               sasl.oauthbearer.assertion.claim.jti.include=true
-               sasl.oauthbearer.scope=my-application-scope
+   ```java-properties
+   security.protocol=SASL_SSL
+   sasl.mechanism=OAUTHBEARER
+   sasl.oauthbearer.token.endpoint.url=https://example.com/oauth2/v1/token
+   sasl.oauthbearer.assertion.private.key.file=/path/to/private-key.pem
+   sasl.oauthbearer.assertion.algorithm=RS256
+   sasl.oauthbearer.assertion.claim.iss=my-kafka-client
+   sasl.oauthbearer.assertion.claim.sub=my-service-account
+   sasl.oauthbearer.assertion.claim.aud=https://example.com
+   sasl.oauthbearer.assertion.claim.exp.seconds=300
+   sasl.oauthbearer.assertion.claim.jti.include=true
+   sasl.oauthbearer.scope=my-application-scope
+   ```
 
 Alternatively, a pre-generated JWT assertion can be read from a file. This is 
useful when assertions are generated by an external process or secrets manager:
 
-               security.protocol=SASL_SSL
-               sasl.mechanism=OAUTHBEARER
-               
sasl.oauthbearer.token.endpoint.url=https://example.com/oauth2/v1/token
-               sasl.oauthbearer.assertion.file=/path/to/assertion.jwt
-               sasl.oauthbearer.scope=my-application-scope
+   ```java-properties
+   security.protocol=SASL_SSL
+   sasl.mechanism=OAUTHBEARER
+   sasl.oauthbearer.token.endpoint.url=https://example.com/oauth2/v1/token
+   sasl.oauthbearer.assertion.file=/path/to/assertion.jwt
+   sasl.oauthbearer.scope=my-application-scope
+   ```
 
 For dynamically-generated assertions, additional static claims can be provided 
via a JSON template file using `sasl.oauthbearer.assertion.template.file`. The 
template supports `header` and `payload` sections whose values are merged into 
the generated JWT:
 
-               {
-                 "header": {
-                   "kid": "my-key-id"
-                 },
-                 "payload": {
-                   "iss": "my-kafka-client",
-                   "sub": "my-service-account",
-                   "aud": "https://example.com";
-                 }
-               }
+   ```json
+   {
+     "header": {
+       "kid": "my-key-id"
+     },
+     "payload": {
+       "iss": "my-kafka-client",
+       "sub": "my-service-account",
+       "aud": "https://example.com";
+     }
+   }
+   ```
 
 When both client assertion and client secret configurations are present, the 
`ClientCredentialsJwtRetriever` uses a three-tier preference order:
 
@@ -601,15 +672,17 @@ This selection is made at configuration time. Once a 
method is selected, it pers
 
 Or, if using the OAuth `urn:ietf:params:oauth:grant-type:jwt-bearer` grant 
type to communicate with the OAuth identity provider, the 
`JwtBearerJwtRetriever` must be configured explicitly since the default 
retriever delegates to `ClientCredentialsJwtRetriever`:
 
-               security.protocol=SASL_SSL
-               sasl.mechanism=OAUTHBEARER
-               
sasl.oauthbearer.jwt.retriever.class=org.apache.kafka.common.security.oauthbearer.JwtBearerJwtRetriever
-               sasl.oauthbearer.assertion.private.key.file=/path/to/private.key
-               sasl.oauthbearer.assertion.algorithm=RS256
-               sasl.oauthbearer.assertion.claim.exp.seconds=600
-               sasl.oauthbearer.assertion.template.file=/path/to/template.json
-               sasl.oauthbearer.scope=my-application-scope
-               
sasl.oauthbearer.token.endpoint.url=https://example.com/oauth2/v1/token
+   ```java-properties
+   security.protocol=SASL_SSL
+   sasl.mechanism=OAUTHBEARER
+   
sasl.oauthbearer.jwt.retriever.class=org.apache.kafka.common.security.oauthbearer.JwtBearerJwtRetriever
+   sasl.oauthbearer.assertion.private.key.file=/path/to/private.key
+   sasl.oauthbearer.assertion.algorithm=RS256
+   sasl.oauthbearer.assertion.claim.exp.seconds=600
+   sasl.oauthbearer.assertion.template.file=/path/to/template.json
+   sasl.oauthbearer.scope=my-application-scope
+   sasl.oauthbearer.token.endpoint.url=https://example.com/oauth2/v1/token
+   ```
 
 The OAUTHBEARER client configuration includes: 
    * sasl.oauthbearer.assertion.algorithm
@@ -675,29 +748,35 @@ Production use cases will also require writing an 
implementation of `org.apache.
 ### Enabling multiple SASL mechanisms in a broker
 
 1. Specify configuration for the login modules of all enabled mechanisms in 
the `KafkaServer` section of the JAAS config file. For example: 
-            
-            KafkaServer {
-                com.sun.security.auth.module.Krb5LoginModule required
-                useKeyTab=true
-                storeKey=true
-                keyTab="/etc/security/keytabs/kafka_server.keytab"
-                principal="kafka/[email protected]";
-            
-                org.apache.kafka.common.security.plain.PlainLoginModule 
required
-                username="admin"
-                password="admin-secret"
-                user_admin="admin-secret"
-                user_alice="alice-secret";
-            };
+
+   ```text
+   KafkaServer {
+       com.sun.security.auth.module.Krb5LoginModule required
+       useKeyTab=true
+       storeKey=true
+       keyTab="/etc/security/keytabs/kafka_server.keytab"
+       principal="kafka/[email protected]";
+
+       org.apache.kafka.common.security.plain.PlainLoginModule required
+       username="admin"
+       password="admin-secret"
+       user_admin="admin-secret"
+       user_alice="alice-secret";
+   };
+   ```
 
 2. Enable the SASL mechanisms in server.properties: 
-            
-            
sasl.enabled.mechanisms=GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER
+
+   ```java-properties
+   sasl.enabled.mechanisms=GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER
+   ```
 
 3. Specify the SASL security protocol and mechanism for inter-broker 
communication in server.properties if required: 
-            
-            security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
-            sasl.mechanism.inter.broker.protocol=GSSAPI (or one of the other 
enabled mechanisms)
+
+   ```java-properties
+   security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
+   sasl.mechanism.inter.broker.protocol=GSSAPI (or one of the other enabled 
mechanisms)
+   ```
 
 4. Follow the mechanism-specific steps in GSSAPI (Kerberos), PLAIN, SCRAM, and 
non-production/production OAUTHBEARER to configure SASL for the enabled 
mechanisms.
 ### Modifying SASL mechanism in a Running Cluster
@@ -734,24 +813,41 @@ Tokens can also be cancelled explicitly. If a token is 
not renewed by the token
 Tokens can be created by using Admin APIs or using 
`kafka-delegation-tokens.sh` script. Delegation token requests 
(create/renew/expire/describe) should be issued only on SASL or SSL 
authenticated channels. Tokens can not be requests if the initial 
authentication is done through delegation token. A token can be created by the 
user for that user or others as well by specifying the `--owner-principal` 
parameter. Owner/Renewers can renew or expire tokens. Owner/renewers can always 
describe t [...]
 
 Create a delegation token: 
-            
-            $ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 
--create   --max-life-time-period -1 --command-config client.properties 
--renewer-principal User:user1
+
+```bash
+$ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create \
+    --max-life-time-period -1 --command-config client.properties \
+    --renewer-principal User:user1
+```
 
 Create a delegation token for a different owner: 
-            
-            $ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 
--create   --max-life-time-period -1 --command-config client.properties 
--renewer-principal User:user1 --owner-principal User:owner1
+
+```bash
+$ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --create \
+    --max-life-time-period -1 --command-config client.properties \
+    --renewer-principal User:user1 --owner-principal User:owner1
+```
 
 Renew a delegation token: 
-            
-            $ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 
--renew    --renew-time-period -1 --command-config client.properties --hmac 
ABCDEFGHIJK
+
+```bash
+$ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --renew \
+    --renew-time-period -1 --command-config client.properties --hmac 
ABCDEFGHIJK
+```
 
 Expire a delegation token: 
-            
-            $ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 
--expire   --expiry-time-period -1   --command-config client.properties  --hmac 
ABCDEFGHIJK
+
+```bash
+$ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --expire \
+    --expiry-time-period -1 --command-config client.properties --hmac 
ABCDEFGHIJK
+```
 
 Existing tokens can be described using the --describe option: 
-            
-            $ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 
--describe --command-config client.properties  --owner-principal User:user1
+
+```bash
+$ bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 --describe \
+    --command-config client.properties --owner-principal User:user1
+```
 
 #### Token Authentication
 
@@ -760,13 +856,15 @@ Delegation token authentication piggybacks on the current 
SASL/SCRAM authenticat
 Configuring Kafka Clients:
 
 1. Configure the JAAS configuration property for each client in 
producer.properties or consumer.properties. The login module describes how the 
clients like producer and consumer can connect to the Kafka Broker. The 
following is an example configuration for a client for the token 
authentication: 
-               
-               
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule 
required \
-                   username="tokenID123" \
-                   password="lAYYSFmLs4bTjf+lTZ1LCHR/ZZFNA==" \
-                   tokenauth="true";
 
-The options `username` and `password` are used by clients to configure the 
token id and token HMAC. And the option `tokenauth` is used to indicate the 
server about token authentication. In this example, clients connect to the 
broker using token id: _tokenID123_. Different clients within a JVM may connect 
using different tokens by specifying different token details in 
`sasl.jaas.config`.
+   ```java-properties
+   sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule 
required \
+       username="tokenID123" \
+       password="lAYYSFmLs4bTjf+lTZ1LCHR/ZZFNA==" \
+       tokenauth="true";
+   ```
+
+   The options `username` and `password` are used by clients to configure the 
token id and token HMAC. And the option `tokenauth` is used to indicate the 
server about token authentication. In this example, clients connect to the 
broker using token id: _tokenID123_. Different clients within a JVM may connect 
using different tokens by specifying different token details in 
`sasl.jaas.config`.
 
 JAAS configuration for clients may alternatively be specified as a JVM 
parameter similar to brokers as described here. Clients use the login section 
named `KafkaClient`. This option allows only one user for all client 
connections from a JVM.
 
diff --git a/docs/security/authorization-and-acls.md 
b/docs/security/authorization-and-acls.md
index 6349dbd6362..4d7abf94e67 100644
--- a/docs/security/authorization-and-acls.md
+++ b/docs/security/authorization-and-acls.md
@@ -27,9 +27,10 @@ type: docs
 
 
 Kafka ships with a pluggable authorization framework, which is configured with 
the `authorizer.class.name` property in the server configuration. Configured 
implementations must extend `org.apache.kafka.server.authorizer.Authorizer`. 
Kafka provides a default implementation which store ACLs in the cluster 
metadata (KRaft metadata log). For KRaft clusters, use the following 
configuration on all nodes (brokers, controllers, or combined broker/controller 
nodes): 
-    
-    
-    
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
+
+```java-properties
+authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
+```
 
 Kafka ACLs are defined in the general format of "Principal {P} is 
[Allowed|Denied] Operation {O} From Host {H} on any Resource {R} matching 
ResourcePattern {RP}". You can read more about the ACL structure in 
[KIP-11](https://cwiki.apache.org/confluence/x/XIUWAw) and resource patterns in 
[KIP-290](https://cwiki.apache.org/confluence/x/QpvLB). In order to add, 
remove, or list ACLs, you can use the Kafka ACL CLI `kafka-acls.sh`. 
 
@@ -40,14 +41,16 @@ If a resource (R) does not have any ACLs defined, meaning 
that no ACL matches th
 ### _Changing the Default Behavior:_
 
 If you prefer that resources without any ACLs be accessible by all users 
(instead of just super users), you can change the default behavior. To do this, 
add the following line to your server.properties file:
-    
-    
-    allow.everyone.if.no.acl.found=true
+
+```java-properties
+allow.everyone.if.no.acl.found=true
+```
 
 With this setting enabled, if a resource does not have any ACLs defined, Kafka 
will allow access to everyone. If a resource has one or more ACLs defined, 
those ACL rules will be enforced as usual, regardless of the setting. One can 
also add super users in server.properties like the following (note that the 
delimiter is semicolon since SSL user names may contain comma). Default 
PrincipalType string "User" is case sensitive. 
-    
-    
-    super.users=User:Bob;User:Alice
+
+```java-properties
+super.users=User:Bob;User:Alice
+```
 
 ### KRaft Principal Forwarding
 
@@ -58,41 +61,46 @@ All of this implies that Kafka must understand how to 
serialize and deserialize
 
 By default, the SSL user name will be of the form 
"CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown". One can 
change that by setting `ssl.principal.mapping.rules` to a customized rule in 
server.properties. This config allows a list of rules for mapping X.500 
distinguished name to short name. The rules are evaluated in order and the 
first rule that matches a distinguished name is used to map it to a short name. 
Any later rules in the list are ignored.   
 The format of `ssl.principal.mapping.rules` is a list where each rule starts 
with "RULE:" and contains an expression as the following formats. Default rule 
will return string representation of the X.500 certificate distinguished name. 
If the distinguished name matches the pattern, then the replacement command 
will be run over the name. This also supports lowercase/uppercase options, to 
force the translated result to be all lower/uppercase case. This is done by 
adding a "/L" or "/U' to th [...]
-    
-    
-    RULE:pattern/replacement/
-    RULE:pattern/replacement/[LU]
+
+```text
+RULE:pattern/replacement/
+RULE:pattern/replacement/[LU]
+```
 
 Example `ssl.principal.mapping.rules` values are: 
-    
-    
-    RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,
-    RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,
-    RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,
-    DEFAULT
+
+```text
+RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,
+RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,
+RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,
+DEFAULT
+```
 
 Above rules translate distinguished name 
"CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to 
"serviceuser" and 
"CN=adminUser,OU=Admin,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to 
"adminuser@admin".   
 For advanced use cases, one can customize the name by setting a customized 
PrincipalBuilder in server.properties like the following. 
-    
-    
-    principal.builder.class=CustomizedPrincipalBuilderClass
+
+```java-properties
+principal.builder.class=CustomizedPrincipalBuilderClass
+```
 
 ### Customizing SASL User Name
 
 By default, the SASL user name will be the primary part of the Kerberos 
principal. One can change that by setting 
`sasl.kerberos.principal.to.local.rules` to a customized rule in 
server.properties. The format of `sasl.kerberos.principal.to.local.rules` is a 
list where each rule works in the same way as the auth_to_local in [Kerberos 
configuration file 
(krb5.conf)](https://web.mit.edu/Kerberos/krb5-latest/doc/admin/conf_files/krb5_conf.html).
 This also support additional lowercase/upperca [...]
-    
-    
-    RULE:[n:string](regexp)s/pattern/replacement/
-    RULE:[n:string](regexp)s/pattern/replacement/g
-    RULE:[n:string](regexp)s/pattern/replacement//L
-    RULE:[n:string](regexp)s/pattern/replacement/g/L
-    RULE:[n:string](regexp)s/pattern/replacement//U
-    RULE:[n:string](regexp)s/pattern/replacement/g/U
+
+```text
+RULE:[n:string](regexp)s/pattern/replacement/
+RULE:[n:string](regexp)s/pattern/replacement/g
+RULE:[n:string](regexp)s/pattern/replacement//L
+RULE:[n:string](regexp)s/pattern/replacement/g/L
+RULE:[n:string](regexp)s/pattern/replacement//U
+RULE:[n:string](regexp)s/pattern/replacement/g/U
+```
 
 An example of adding a rule to properly translate [email protected] to user 
while also keeping the default rule in place is: 
-    
-    
-    
sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT
+
+```java-properties
+sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT
+```
 
 ## Command Line Interface
 
@@ -536,53 +544,75 @@ Convenience
 
   * **Adding Acls**  
 Suppose you want to add an acl "Principals User:Bob and User:Alice are allowed 
to perform Operation Read and Write on Topic Test-Topic from IP 198.51.100.0 
and IP 198.51.100.1". You can do that by executing the CLI with following 
options: 
-        
-        $ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add 
--allow-principal User:Bob --allow-principal User:Alice --allow-host 
198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write 
--topic Test-topic
+
+```bash
+$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal 
User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 
198.51.100.1 --operation Read --operation Write --topic Test-topic
+```
 
 By default, all principals that don't have an explicit acl that allows access 
for an operation to a resource are denied. In rare cases where an allow acl is 
defined that allows access to all but some principal we will have to use the 
--deny-principal and --deny-host option. For example, if we want to allow all 
users to Read from Test-topic but only deny User:BadBob from IP 198.51.100.3 we 
can do so using following commands: 
-        
-        $ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add 
--allow-principal User:'*' --allow-host '*' --deny-principal User:BadBob 
--deny-host 198.51.100.3 --operation Read --topic Test-topic
+
+```bash
+$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal 
User:'*' --allow-host '*' --deny-principal User:BadBob --deny-host 198.51.100.3 
--operation Read --topic Test-topic
+```
 
 Note that `--allow-host` and `--deny-host` only support IP addresses 
(hostnames are not supported). Above examples add acls to a topic by specifying 
--topic [topic-name] as the resource pattern option. Similarly user can add 
acls to cluster by specifying --cluster and to a consumer group by specifying 
--group [group-name]. You can add acls on any resource of a certain type, e.g. 
suppose you wanted to add an acl "Principal User:Peter is allowed to produce to 
any Topic from IP 198.51.200.0 [...]
-        
-        $ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add 
--allow-principal User:Peter --allow-host 198.51.200.1 --producer --topic '*'
+
+```bash
+$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal 
User:Peter --allow-host 198.51.200.1 --producer --topic '*'
+```
 
 You can add acls on prefixed resource patterns, e.g. suppose you want to add 
an acl "Principal User:Jane is allowed to produce to any Topic whose name 
starts with 'Test-' from any host". You can do that by executing the CLI with 
following options: 
-        
-        $ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add 
--allow-principal User:Jane --producer --topic Test- --resource-pattern-type 
prefixed
+
+```bash
+$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal 
User:Jane --producer --topic Test- --resource-pattern-type prefixed
+```
 
 Note, --resource-pattern-type defaults to 'literal', which only affects 
resources with the exact same name or, in the case of the wildcard resource 
name '*', a resource with any name.
   * **Removing Acls**  
 Removing acls is pretty much the same. The only difference is instead of --add 
option users will have to specify --remove option. To remove the acls added by 
the first example above we can execute the CLI with following options: 
-        
-        $ bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove 
--allow-principal User:Bob --allow-principal User:Alice --allow-host 
198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write 
--topic Test-topic 
+
+```bash
+$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove 
--allow-principal User:Bob --allow-principal User:Alice --allow-host 
198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write 
--topic Test-topic
+```
 
 If you want to remove the acl added to the prefixed resource pattern above we 
can execute the CLI with following options: 
-        
-        $ bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove 
--allow-principal User:Jane --producer --topic Test- --resource-pattern-type 
Prefixed
+
+```bash
+$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --remove 
--allow-principal User:Jane --producer --topic Test- --resource-pattern-type 
Prefixed
+```
 
   * **List Acls**  
 We can list acls for any resource by specifying the --list option with the 
resource. To list all acls on the literal resource pattern Test-topic, we can 
execute the CLI with following options: 
-        
-        $ bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic 
Test-topic
+
+```bash
+$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic Test-topic
+```
 
 However, this will only return the acls that have been added to this exact 
resource pattern. Other acls can exist that affect access to the topic, e.g. 
any acls on the topic wildcard '*', or any acls on prefixed resource patterns. 
Acls on the wildcard resource pattern can be queried explicitly: 
-        
-        $ bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic 
'*'
+
+```bash
+$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic '*'
+```
 
 However, it is not necessarily possible to explicitly query for acls on 
prefixed resource patterns that match Test-topic as the name of such patterns 
may not be known. We can list _all_ acls affecting Test-topic by using 
'--resource-pattern-type match', e.g. 
-        
-        > bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic 
Test-topic --resource-pattern-type match
+
+```bash
+$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --list --topic 
Test-topic --resource-pattern-type match
+```
 
 This will list acls on all matching literal, wildcard and prefixed resource 
patterns.
   * **Adding or removing a principal as producer or consumer**  
 The most common use case for acl management are adding/removing a principal as 
producer or consumer so we added convenience options to handle these cases. In 
order to add User:Bob as a producer of Test-topic we can execute the following 
command: 
-        
-        $ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add 
--allow-principal User:Bob --producer --topic Test-topic
+
+```bash
+$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal 
User:Bob --producer --topic Test-topic
+```
 
 Similarly to add Alice as a consumer of Test-topic with consumer group Group-1 
we just have to pass --consumer option: 
-        
-        $ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add 
--allow-principal User:Bob --consumer --topic Test-topic --group Group-1 
+
+```bash
+$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --add --allow-principal 
User:Bob --consumer --topic Test-topic --group Group-1
+```
 
 Note that for consumer option we must also specify the consumer group. In 
order to remove a principal from producer or consumer role we just need to pass 
--remove option. 
 
diff --git a/docs/security/encryption-and-authentication-using-ssl.md 
b/docs/security/encryption-and-authentication-using-ssl.md
index 11dd9fad03f..7b903f52771 100644
--- a/docs/security/encryption-and-authentication-using-ssl.md
+++ b/docs/security/encryption-and-authentication-using-ssl.md
@@ -31,8 +31,10 @@ Apache Kafka allows clients to use SSL for encryption of 
traffic as well as auth
 ### Generate SSL key and certificate for each Kafka broker
 
 The first step of deploying one or more brokers with SSL support is to 
generate a public/private keypair for every server. Since Kafka expects all 
keys and certificates to be stored in keystores we will use Java's keytool 
command for this task. The tool supports two different keystore formats, the 
Java specific jks format which has been deprecated by now, as well as PKCS12. 
PKCS12 is the default format as of Java version 9, to ensure this format is 
being used regardless of the Java versi [...]
-         
-         $ keytool -keystore {keystorefile} -alias localhost -validity 
{validity} -genkey -keyalg RSA -storetype pkcs12
+
+```bash
+$ keytool -keystore {keystorefile} -alias localhost -validity {validity} 
-genkey -keyalg RSA -storetype pkcs12
+```
 
 You need to specify two parameters in the above command: 
 1. keystorefile: the keystore file that stores the keys (and later the 
certificate) for this broker. The keystore file contains the private and public 
keys of this broker, therefore it needs to be kept safe. Ideally this step is 
run on the Kafka broker that the key will be used on, as this key should never 
be transmitted/leave the server that it is intended for.
@@ -40,8 +42,10 @@ You need to specify two parameters in the above command:
   
 To obtain a certificate that can be used with the private key that was just 
created a certificate signing request needs to be created. This signing 
request, when signed by a trusted CA results in the actual certificate which 
can then be installed in the keystore and used for authentication purposes.  
 To generate certificate signing requests run the following command for all 
server keystores created so far. 
-    
-    $ keytool -keystore server.keystore.jks -alias localhost -validity 
{validity} -genkey -keyalg RSA -destkeystoretype pkcs12 -ext 
SAN=DNS:{FQDN},IP:{IPADDRESS1}
+
+```bash
+$ keytool -keystore server.keystore.jks -alias localhost -validity {validity} 
-genkey -keyalg RSA -destkeystoretype pkcs12 -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}
+```
 
 This command assumes that you want to add hostname information to the 
certificate, if this is not the case, you can omit the extension parameter 
`-ext SAN=DNS:{FQDN},IP:{IPADDRESS1}`. Please see below for more information on 
this. 
 
@@ -50,10 +54,11 @@ This command assumes that you want to add hostname 
information to the certificat
 Host name verification, when enabled, is the process of checking attributes 
from the certificate that is presented by the server you are connecting to 
against the actual hostname or ip address of that server to ensure that you are 
indeed connecting to the correct server.  
 The main reason for this check is to prevent man-in-the-middle attacks. For 
Kafka, this check has been disabled by default for a long time, but as of Kafka 
2.0.0 host name verification of servers is enabled by default for client 
connections as well as inter-broker connections.  
 Server host name verification may be disabled by setting 
`ssl.endpoint.identification.algorithm` to an empty string.  
-For dynamically configured broker listeners, hostname verification may be 
disabled using `kafka-configs.sh`:  
+For dynamically configured broker listeners, hostname verification may be 
disabled using `kafka-configs.sh`: 
 
-    
-    $ bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type 
brokers --entity-name 0 --alter --add-config 
"listener.name.internal.ssl.endpoint.identification.algorithm="
+```bash
+$ bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers 
--entity-name 0 --alter --add-config 
"listener.name.internal.ssl.endpoint.identification.algorithm="
+```
 
 **Note:**
 
@@ -67,8 +72,10 @@ If host name verification is enabled, clients will verify 
the server's fully qua
 While Kafka checks both fields, usage of the common name field for hostname 
verification has been 
[deprecated](https://tools.ietf.org/html/rfc2818#section-3.1) since 2000 and 
should be avoided if possible. In addition the SAN field is much more flexible, 
allowing for multiple DNS and IP entries to be declared in a certificate.  
 Another advantage is that if the SAN field is used for hostname verification 
the common name can be set to a more meaningful value for authorization 
purposes. Since we need the SAN field to be contained in the signed 
certificate, it will be specified when generating the signing request. It can 
also be specified when generating the keypair, but this will not automatically 
be copied into the signing request.  
 To add a SAN field append the following argument ` -ext 
SAN=DNS:{FQDN},IP:{IPADDRESS}` to the keytool command: 
-    
-    $ keytool -keystore server.keystore.jks -alias localhost -validity 
{validity} -genkey -keyalg RSA -destkeystoretype pkcs12 -ext 
SAN=DNS:{FQDN},IP:{IPADDRESS1}
+
+```bash
+$ keytool -keystore server.keystore.jks -alias localhost -validity {validity} 
-genkey -keyalg RSA -destkeystoretype pkcs12 -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}
+```
 
 ### Creating your own CA
 
@@ -80,119 +87,133 @@ For this guide we will be our own Certificate Authority. 
When setting up a produ
 
 Due to a [bug](https://www.openssl.org/docs/man1.1.1/man1/x509.html#BUGS) in 
OpenSSL, the x509 module will not copy requested extension fields from CSRs 
into the final certificate. Since we want the SAN extension to be present in 
our certificate to enable hostname verification, we'll use the _ca_ module 
instead. This requires some additional configuration to be in place before we 
generate our CA keypair.  
 Save the following listing into a file called openssl-ca.cnf and adjust the 
values for validity and common attributes as necessary. 
-         
-         HOME            = .
-         RANDFILE        = $ENV::HOME/.rnd
-         
-         ####################################################################
-         [ ca ]
-         default_ca    = CA_default      # The default ca section
-         
-         [ CA_default ]
-         
-         base_dir      = .
-         certificate   = $base_dir/cacert.pem   # The CA certificate
-         private_key   = $base_dir/cakey.pem    # The CA private key
-         new_certs_dir = $base_dir              # Location for new certs after 
signing
-         database      = $base_dir/index.txt    # Database index file
-         serial        = $base_dir/serial.txt   # The current serial number
-         
-         default_days     = 1000         # How long to certify for
-         default_crl_days = 30           # How long before next CRL
-         default_md       = sha256       # Use public key default MD
-         preserve         = no           # Keep passed DN ordering
-         
-         x509_extensions = ca_extensions # The extensions to add to the cert
-         
-         email_in_dn     = no            # Don't concat the email in the DN
-         copy_extensions = copy          # Required to copy SANs from CSR to 
cert
-         
-         ####################################################################
-         [ req ]
-         default_bits       = 4096
-         default_keyfile    = cakey.pem
-         distinguished_name = ca_distinguished_name
-         x509_extensions    = ca_extensions
-         string_mask        = utf8only
-         
-         ####################################################################
-         [ ca_distinguished_name ]
-         countryName         = Country Name (2 letter code)
-         countryName_default = DE
-         
-         stateOrProvinceName         = State or Province Name (full name)
-         stateOrProvinceName_default = Test Province
-         
-         localityName                = Locality Name (eg, city)
-         localityName_default        = Test Town
-         
-         organizationName            = Organization Name (eg, company)
-         organizationName_default    = Test Company
-         
-         organizationalUnitName         = Organizational Unit (eg, division)
-         organizationalUnitName_default = Test Unit
-         
-         commonName         = Common Name (e.g. server FQDN or YOUR name)
-         commonName_default = Test Name
-         
-         emailAddress         = Email Address
-         emailAddress_default = [email protected]
-         
-         ####################################################################
-         [ ca_extensions ]
-         
-         subjectKeyIdentifier   = hash
-         authorityKeyIdentifier = keyid:always, issuer
-         basicConstraints       = critical, CA:true
-         keyUsage               = keyCertSign, cRLSign
-         
-         ####################################################################
-         [ signing_policy ]
-         countryName            = optional
-         stateOrProvinceName    = optional
-         localityName           = optional
-         organizationName       = optional
-         organizationalUnitName = optional
-         commonName             = supplied
-         emailAddress           = optional
-         
-         ####################################################################
-         [ signing_req ]
-         subjectKeyIdentifier   = hash
-         authorityKeyIdentifier = keyid,issuer
-         basicConstraints       = CA:FALSE
-         keyUsage               = digitalSignature, keyEncipherment
+
+```ini
+HOME            = .
+RANDFILE        = $ENV::HOME/.rnd
+
+####################################################################
+[ ca ]
+default_ca    = CA_default      # The default ca section
+
+[ CA_default ]
+
+base_dir      = .
+certificate   = $base_dir/cacert.pem   # The CA certificate
+private_key   = $base_dir/cakey.pem    # The CA private key
+new_certs_dir = $base_dir              # Location for new certs after signing
+database      = $base_dir/index.txt    # Database index file
+serial        = $base_dir/serial.txt   # The current serial number
+
+default_days     = 1000         # How long to certify for
+default_crl_days = 30           # How long before next CRL
+default_md       = sha256       # Use public key default MD
+preserve         = no           # Keep passed DN ordering
+
+x509_extensions = ca_extensions # The extensions to add to the cert
+
+email_in_dn     = no            # Don't concat the email in the DN
+copy_extensions = copy          # Required to copy SANs from CSR to cert
+
+####################################################################
+[ req ]
+default_bits       = 4096
+default_keyfile    = cakey.pem
+distinguished_name = ca_distinguished_name
+x509_extensions    = ca_extensions
+string_mask        = utf8only
+
+####################################################################
+[ ca_distinguished_name ]
+countryName         = Country Name (2 letter code)
+countryName_default = DE
+
+stateOrProvinceName         = State or Province Name (full name)
+stateOrProvinceName_default = Test Province
+
+localityName                = Locality Name (eg, city)
+localityName_default        = Test Town
+
+organizationName            = Organization Name (eg, company)
+organizationName_default    = Test Company
+
+organizationalUnitName         = Organizational Unit (eg, division)
+organizationalUnitName_default = Test Unit
+
+commonName         = Common Name (e.g. server FQDN or YOUR name)
+commonName_default = Test Name
+
+emailAddress         = Email Address
+emailAddress_default = [email protected]
+
+####################################################################
+[ ca_extensions ]
+
+subjectKeyIdentifier   = hash
+authorityKeyIdentifier = keyid:always, issuer
+basicConstraints       = critical, CA:true
+keyUsage               = keyCertSign, cRLSign
+
+####################################################################
+[ signing_policy ]
+countryName            = optional
+stateOrProvinceName    = optional
+localityName           = optional
+organizationName       = optional
+organizationalUnitName = optional
+commonName             = supplied
+emailAddress           = optional
+
+####################################################################
+[ signing_req ]
+subjectKeyIdentifier   = hash
+authorityKeyIdentifier = keyid,issuer
+basicConstraints       = CA:FALSE
+keyUsage               = digitalSignature, keyEncipherment
+```
 
 Then create a database and serial number file, these will be used to keep 
track of which certificates were signed with this CA. Both of these are simply 
text files that reside in the same directory as your CA keys. 
-         
-         $ echo 01 > serial.txt
-         $ touch index.txt
+
+```bash
+$ echo 01 > serial.txt
+$ touch index.txt
+```
 
 With these steps done you are now ready to generate your CA that will be used 
to sign certificates later. 
-         
-         $ openssl req -x509 -config openssl-ca.cnf -newkey rsa:4096 -sha256 
-nodes -out cacert.pem -outform PEM
+
+```bash
+$ openssl req -x509 -config openssl-ca.cnf -newkey rsa:4096 -sha256 -nodes 
-out cacert.pem -outform PEM
+```
 
 The CA is simply a public/private key pair and certificate that is signed by 
itself, and is only intended to sign other certificates.  
 This keypair should be kept very safe, if someone gains access to it, they can 
create and sign certificates that will be trusted by your infrastructure, which 
means they will be able to impersonate anybody when connecting to any service 
that trusts this CA.  
 The next step is to add the generated CA to the **clients' truststore** so 
that the clients can trust this CA: 
-         
-         $ keytool -keystore client.truststore.jks -alias CARoot -import -file 
ca-cert
+
+```bash
+$ keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
+```
 
 **Note:** If you configure the Kafka brokers to require client authentication 
by setting ssl.client.auth to be "requested" or "required" in the Kafka brokers 
config then you must provide a truststore for the Kafka brokers as well and it 
should have all the CA certificates that clients' keys were signed by. 
-         
-         $ keytool -keystore server.truststore.jks -alias CARoot -import -file 
ca-cert
+
+```bash
+$ keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
+```
 
 In contrast to the keystore in step 1 that stores each machine's own identity, 
the truststore of a client stores all the certificates that the client should 
trust. Importing a certificate into one's truststore also means trusting all 
certificates that are signed by that certificate. As the analogy above, 
trusting the government (CA) also means trusting all passports (certificates) 
that it has issued. This attribute is called the chain of trust, and it is 
particularly useful when deployin [...]
 ### Signing the certificate
 
 Then sign it with the CA: 
-         
-         $ openssl ca -config openssl-ca.cnf -policy signing_policy 
-extensions signing_req -out {server certificate} -infiles {certificate signing 
request}
+
+```bash
+$ openssl ca -config openssl-ca.cnf -policy signing_policy -extensions 
signing_req -out {server certificate} -infiles {certificate signing request}
+```
 
 Finally, you need to import both the certificate of the CA and the signed 
certificate into the keystore: 
-         
-         $ keytool -keystore {keystore} -alias CARoot -import -file {CA 
certificate}
-         $ keytool -keystore {keystore} -alias localhost -import -file 
cert-signed
+
+```bash
+$ keytool -keystore {keystore} -alias CARoot -import -file {CA certificate}
+$ keytool -keystore {keystore} -alias localhost -import -file cert-signed
+```
 
 The definitions of the parameters are the following: 
 1. keystore: the location of the keystore
@@ -223,22 +244,28 @@ Kafka brokers need both these usages to be allowed, as 
for intra-cluster communi
 Corporate Root CAs are often kept offline for security reasons. To enable 
day-to-day usage, so called intermediate CAs are created, which are then used 
to sign the final certificates. When importing a certificate into the keystore 
that was signed by an intermediate CA it is necessary to provide the entire 
chain of trust up to the root CA. This can be done by simply _cat_ ing the 
certificate files into one combined certificate file and then importing this 
with keytool. 
 3. **Failure to copy extension fields**  
 CA operators are often hesitant to copy and requested extension fields from 
CSRs and prefer to specify these themselves as this makes it harder for a 
malicious party to obtain certificates with potentially misleading or 
fraudulent values. It is advisable to double check signed certificates, whether 
these contain all requested SAN fields to enable proper hostname verification. 
The following command can be used to print certificate details to the console, 
which should be compared with what [...]
-            
-            $ openssl x509 -in certificate.crt -text -noout
+
+```bash
+$ openssl x509 -in certificate.crt -text -noout
+```
 
 ### Configuring Kafka Brokers
 
 If SSL is not enabled for inter-broker communication (see below for how to 
enable it), both PLAINTEXT and SSL ports will be necessary. 
-         
-         listeners=PLAINTEXT://host.name:port,SSL://host.name:port
 
-Following SSL configs are needed on the broker side 
-         
-         ssl.keystore.location=/var/private/ssl/server.keystore.jks
-         ssl.keystore.password=test1234
-         ssl.key.password=test1234
-         ssl.truststore.location=/var/private/ssl/server.truststore.jks
-         ssl.truststore.password=test1234
+```java-properties
+listeners=PLAINTEXT://host.name:port,SSL://host.name:port
+```
+
+Following SSL configs are needed on the broker side:
+
+```java-properties
+ssl.keystore.location=/var/private/ssl/server.keystore.jks
+ssl.keystore.password=test1234
+ssl.key.password=test1234
+ssl.truststore.location=/var/private/ssl/server.truststore.jks
+ssl.truststore.password=test1234
+```
 
 Note: ssl.truststore.password is technically optional but highly recommended. 
If a password is not set access to the truststore is still available, but 
integrity checking is disabled. Optional settings that are worth considering: 
 1. ssl.client.auth=none ("required" => client authentication is required, 
"requested" => client authentication is requested and client without certs can 
still connect. The usage of "requested" is discouraged as it provides a false 
sense of security and misconfigured clients will still connect successfully.)
@@ -247,46 +274,58 @@ Note: ssl.truststore.password is technically optional but 
highly recommended. If
 4. ssl.keystore.type=JKS
 5. ssl.truststore.type=JKS
 6. ssl.secure.random.implementation=SHA1PRNG
-If you want to enable SSL for inter-broker communication, add the following to 
the server.properties file (it defaults to PLAINTEXT) 
-    
-    security.inter.broker.protocol=SSL
+If you want to enable SSL for inter-broker communication, add the following to 
the server.properties file (it defaults to PLAINTEXT):
+
+```java-properties
+security.inter.broker.protocol=SSL
+```
 
 Due to import regulations in some countries, the Oracle implementation limits 
the strength of cryptographic algorithms available by default. If stronger 
algorithms are needed (for example, AES with 256-bit keys), the [JCE Unlimited 
Strength Jurisdiction Policy 
Files](https://www.oracle.com/technetwork/java/javase/downloads/index.html) 
must be obtained and installed in the JDK/JRE. See the [JCA Providers 
Documentation](https://docs.oracle.com/javase/8/docs/technotes/guides/security/SunPro
 [...]
 
 The JRE/JDK will have a default pseudo-random number generator (PRNG) that is 
used for cryptography operations, so it is not required to configure the 
implementation used with the `ssl.secure.random.implementation`. However, there 
are performance issues with some implementations (notably, the default chosen 
on Linux systems, `NativePRNG`, utilizes a global lock). In cases where 
performance of SSL connections becomes an issue, consider explicitly setting 
the implementation to be used. The [...]
 
-Once you start the broker you should be able to see in the server.log 
-    
-    with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> 
EndPoint(192.168.64.1,9093,SSL)
+Once you start the broker you should be able to see in the server.log:
+
+```text
+with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> 
EndPoint(192.168.64.1,9093,SSL)
+```
 
-To check quickly if the server keystore and truststore are setup properly you 
can run the following command 
-    
-    $ openssl s_client -debug -connect localhost:9093 -tls1
+To check quickly if the server keystore and truststore are setup properly you 
can run the following command:
+
+```bash
+$ openssl s_client -debug -connect localhost:9093 -tls1
+```
 
 (Note: TLSv1 should be listed under ssl.enabled.protocols)  
 In the output of this command you should see server's certificate: 
-    
-    -----BEGIN CERTIFICATE-----
-    {variable sized random bytes}
-    -----END CERTIFICATE-----
-    subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani
-    issuer=/C=US/ST=CA/L=Santa 
Clara/O=org/OU=org/CN=kafka/[email protected]
+
+```text
+-----BEGIN CERTIFICATE-----
+{variable sized random bytes}
+-----END CERTIFICATE-----
+subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani
+issuer=/C=US/ST=CA/L=Santa 
Clara/O=org/OU=org/CN=kafka/[email protected]
+```
 
 If the certificate does not show up or if there are any other error messages 
then your keystore is not setup properly.
 ### Configuring Kafka Clients
 
 SSL is supported only for the new Kafka Producer and Consumer, the older API 
is not supported. The configs for SSL will be the same for both producer and 
consumer.  
 If client authentication is not required in the broker, then the following is 
a minimal configuration example: 
-         
-         security.protocol=SSL
-         ssl.truststore.location=/var/private/ssl/client.truststore.jks
-         ssl.truststore.password=test1234
+
+```java-properties
+security.protocol=SSL
+ssl.truststore.location=/var/private/ssl/client.truststore.jks
+ssl.truststore.password=test1234
+```
 
 Note: ssl.truststore.password is technically optional but highly recommended. 
If a password is not set access to the truststore is still available, but 
integrity checking is disabled. If client authentication is required, then a 
keystore must be created like in step 1 and the following must also be 
configured: 
-         
-         ssl.keystore.location=/var/private/ssl/client.keystore.jks
-         ssl.keystore.password=test1234
-         ssl.key.password=test1234
+
+```java-properties
+ssl.keystore.location=/var/private/ssl/client.keystore.jks
+ssl.keystore.password=test1234
+ssl.key.password=test1234
+```
 
 Other configuration settings that may also be needed depending on our 
requirements and the broker configuration: 
 1. ssl.provider (Optional). The name of the security provider used for SSL 
connections. Default value is the default security provider of the JVM.
@@ -296,9 +335,11 @@ Other configuration settings that may also be needed 
depending on our requiremen
 5. ssl.keystore.type=JKS
   
 Examples using console-producer and console-consumer: 
-    
-    $ bin/kafka-console-producer.sh --bootstrap-server localhost:9093 --topic 
test --command-config client-ssl.properties
-    $ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic 
test --command-config client-ssl.properties
+
+```bash
+$ bin/kafka-console-producer.sh --bootstrap-server localhost:9093 --topic test 
--command-config client-ssl.properties
+$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test 
--command-config client-ssl.properties
+```
 
 
 
diff --git 
a/docs/security/incorporating-security-features-in-a-running-cluster.md 
b/docs/security/incorporating-security-features-in-a-running-cluster.md
index 6065b47cb16..9a27f2069a0 100644
--- a/docs/security/incorporating-security-features-in-a-running-cluster.md
+++ b/docs/security/incorporating-security-features-in-a-running-cluster.md
@@ -42,49 +42,57 @@ The security implementation lets you configure different 
protocols for both brok
 When performing an incremental bounce stop the brokers cleanly via a SIGTERM. 
It's also good practice to wait for restarted replicas to return to the ISR 
list before moving onto the next node. 
 
 As an example, say we wish to encrypt both broker-client and broker-broker 
communication with SSL. In the first incremental bounce, an SSL port is opened 
on each node: 
-    
-    
-    listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
+
+```java-properties
+listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
+```
 
 We then restart the clients, changing their config to point at the newly 
opened, secured port: 
-    
-    
-    bootstrap.servers = [broker1:9092,...]
-    security.protocol = SSL
-    ...etc
+
+```java-properties
+bootstrap.servers = [broker1:9092,...]
+security.protocol = SSL
+...etc
+```
 
 In the second incremental server bounce we instruct Kafka to use SSL as the 
broker-broker protocol (which will use the same SSL port): 
-    
-    
-    listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
-    security.inter.broker.protocol=SSL
+
+```java-properties
+listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
+security.inter.broker.protocol=SSL
+```
 
 In the final bounce we secure the cluster by closing the PLAINTEXT port: 
-    
-    
-    listeners=SSL://broker1:9092
-    security.inter.broker.protocol=SSL
+
+```java-properties
+listeners=SSL://broker1:9092
+security.inter.broker.protocol=SSL
+```
 
 Alternatively we might choose to open multiple ports so that different 
protocols can be used for broker-broker and broker-client communication. Say we 
wished to use SSL encryption throughout (i.e. for broker-broker and 
broker-client communication) but we'd like to add SASL authentication to the 
broker-client connection also. We would achieve this by opening two additional 
ports during the first bounce: 
-    
-    
-    
listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
+
+```java-properties
+listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
+```
 
 We would then restart the clients, changing their config to point at the newly 
opened, SASL & SSL secured port: 
-    
-    
-    bootstrap.servers = [broker1:9093,...]
-    security.protocol = SASL_SSL
-    ...etc
+
+```java-properties
+bootstrap.servers = [broker1:9093,...]
+security.protocol = SASL_SSL
+...etc
+```
 
 The second server bounce would switch the cluster to use encrypted 
broker-broker communication via the SSL port we previously opened on port 9092: 
-    
-    
-    
listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
-    security.inter.broker.protocol=SSL
+
+```java-properties
+listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
+security.inter.broker.protocol=SSL
+```
 
 The final bounce secures the cluster by closing the PLAINTEXT port. 
-    
-    
-    listeners=SSL://broker1:9092,SASL_SSL://broker1:9093
-    security.inter.broker.protocol=SSL
+
+```java-properties
+listeners=SSL://broker1:9092,SASL_SSL://broker1:9093
+security.inter.broker.protocol=SSL
+```
diff --git a/docs/security/listener-configuration.md 
b/docs/security/listener-configuration.md
index 5d31d670dc2..303d9237d53 100644
--- a/docs/security/listener-configuration.md
+++ b/docs/security/listener-configuration.md
@@ -29,19 +29,22 @@ type: docs
 In order to secure a Kafka cluster, it is necessary to secure the channels 
that are used to communicate with the servers. Each server must define the set 
of listeners that are used to receive requests from clients as well as other 
servers. Each listener may be configured to authenticate clients using various 
mechanisms and to ensure traffic between the server and the client is 
encrypted. This section provides a primer for the configuration of listeners.
 
 Kafka servers support listening for connections on multiple ports. This is 
configured through the `listeners` property in the server configuration, which 
accepts a comma-separated list of the listeners to enable. At least one 
listener must be defined on each server. The format of each listener defined in 
`listeners` is given below:
-    
-    
-    {LISTENER_NAME}://{hostname}:{port}
+
+```text
+{LISTENER_NAME}://{hostname}:{port}
+```
 
 The `LISTENER_NAME` is usually a descriptive name which defines the purpose of 
the listener. For example, many configurations use a separate listener for 
client traffic, so they might refer to the corresponding listener as `CLIENT` 
in the configuration:
-    
-    
-    listeners=CLIENT://localhost:9092
+
+```java-properties
+listeners=CLIENT://localhost:9092
+```
 
 The security protocol of each listener is defined in a separate configuration: 
`listener.security.protocol.map`. The value is a comma-separated list of each 
listener mapped to its security protocol. For example, the follow value 
configuration specifies that the `CLIENT` listener will use SSL while the 
`BROKER` listener will use plaintext.
-    
-    
-    listener.security.protocol.map=CLIENT:SSL,BROKER:PLAINTEXT
+
+```java-properties
+listener.security.protocol.map=CLIENT:SSL,BROKER:PLAINTEXT
+```
 
 Possible options (case-insensitive) for the security protocol are given below:
 
@@ -55,9 +58,10 @@ Possible options (case-insensitive) for the security 
protocol are given below:
 The plaintext protocol provides no security and does not require any 
additional configuration. In the following sections, this document covers how 
to configure the remaining protocols.
 
 If each required listener uses a separate security protocol, it is also 
possible to use the security protocol name as the listener name in `listeners`. 
Using the example above, we could skip the definition of the `CLIENT` and 
`BROKER` listeners using the following definition:
-    
-    
-    listeners=SSL://localhost:9092,PLAINTEXT://localhost:9093
+
+```java-properties
+listeners=SSL://localhost:9092,PLAINTEXT://localhost:9093
+```
 
 However, we recommend users to provide explicit names for the listeners since 
it makes the intended usage of each listener clearer.
 
@@ -66,26 +70,28 @@ Among the listeners in this list, it is possible to declare 
the listener to be u
 In a KRaft cluster, a broker is any server which has the `broker` role enabled 
in `process.roles` and a controller is any server which has the `controller` 
role enabled. Listener configuration depends on the role. The listener defined 
by `inter.broker.listener.name` is used exclusively for requests between 
brokers. Controllers, on the other hand, must use separate listener which is 
defined by the `controller.listener.names` configuration. This cannot be set to 
the same value as the inter [...]
 
 Controllers receive requests both from other controllers and from brokers. For 
this reason, even if a server does not have the `controller` role enabled (i.e. 
it is just a broker), it must still define the controller listener along with 
any security properties that are needed to configure it. For example, we might 
use the following configuration on a standalone broker:
-    
-    
-    process.roles=broker
-    listeners=BROKER://localhost:9092
-    inter.broker.listener.name=BROKER
-    controller.quorum.bootstrap.servers=localhost:9093
-    controller.listener.names=CONTROLLER
-    listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL
+
+```java-properties
+process.roles=broker
+listeners=BROKER://localhost:9092
+inter.broker.listener.name=BROKER
+controller.quorum.bootstrap.servers=localhost:9093
+controller.listener.names=CONTROLLER
+listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL
+```
 
 The controller listener is still configured in this example to use the 
`SASL_SSL` security protocol, but it is not included in `listeners` since the 
broker does not expose the controller listener itself. The port that will be 
used in this case comes from the `controller.quorum.voters` configuration, 
which defines the complete list of controllers.
 
 For KRaft servers which have both the broker and controller role enabled, the 
configuration is similar. The only difference is that the controller listener 
must be included in `listeners`:
-    
-    
-    process.roles=broker,controller
-    listeners=BROKER://localhost:9092,CONTROLLER://localhost:9093
-    inter.broker.listener.name=BROKER
-    controller.quorum.bootstrap.servers=localhost:9093
-    controller.listener.names=CONTROLLER
-    listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL
+
+```java-properties
+process.roles=broker,controller
+listeners=BROKER://localhost:9092,CONTROLLER://localhost:9093
+inter.broker.listener.name=BROKER
+controller.quorum.bootstrap.servers=localhost:9093
+controller.listener.names=CONTROLLER
+listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL
+```
 
 It is a requirement that the host and port defined in 
`controller.quorum.bootstrap.servers` is routed to the exposed controller 
listeners. For example, here the `CONTROLLER` listener is bound to 
localhost:9093. The connection string defined by 
`controller.quorum.bootstrap.servers` must then also use localhost:9093, as it 
does here.
 


Reply via email to