This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 792a76ac762 [improve] PIP-467: Convert pulsar-common module logging 
from SLF4J to slog (#25506)
792a76ac762 is described below

commit 792a76ac762109e6ceab5fc25ae2ccb087ec2ab5
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Apr 10 12:32:14 2026 -0700

    [improve] PIP-467: Convert pulsar-common module logging from SLF4J to slog 
(#25506)
---
 distribution/shell/src/assemble/LICENSE.bin.txt    |  2 +
 pulsar-common/build.gradle.kts                     |  2 +-
 .../client/impl/schema/KeyValueSchemaInfo.java     |  4 +-
 .../common/allocator/PulsarByteBufAllocator.java   | 12 ++-
 .../pulsar/common/api/raw/MessageParser.java       | 40 ++++++---
 .../common/intercept/BrokerEntryMetadataUtils.java | 28 ++++---
 .../org/apache/pulsar/common/nar/FileUtils.java    | 92 +++++----------------
 .../apache/pulsar/common/nar/NarClassLoader.java   | 16 ++--
 .../org/apache/pulsar/common/nar/NarUnpacker.java  | 21 +++--
 .../common/policies/data/ClusterDataImpl.java      |  4 +-
 .../common/policies/data/OffloadPoliciesImpl.java  |  6 +-
 .../apache/pulsar/common/protocol/Commands.java    | 13 +--
 .../protocol/OptionalProxyProtocolDecoder.java     |  4 +-
 .../pulsar/common/protocol/PulsarDecoder.java      | 17 ++--
 .../pulsar/common/protocol/PulsarHandler.java      | 35 ++++----
 .../common/sasl/JAASCredentialsContainer.java      | 10 +--
 .../pulsar/common/sasl/TGTRefreshThread.java       | 95 +++++++++++++---------
 .../semaphore/AsyncDualMemoryLimiterImpl.java      |  4 -
 .../common/semaphore/AsyncSemaphoreImpl.java       |  4 -
 .../common/stats/JvmDefaultGCMetricsLogger.java    | 10 +--
 .../pulsar/common/stats/JvmG1GCMetricsLogger.java  |  7 +-
 .../org/apache/pulsar/common/stats/JvmMetrics.java | 15 ++--
 .../pulsar/common/tls/TlsHostnameVerifier.java     |  8 +-
 .../pulsar/common/topics/TopicsPatternFactory.java | 12 +--
 .../pulsar/common/util/ClassLoaderUtils.java       |  6 +-
 .../common/util/FileModifiedTimeUpdater.java       |  7 +-
 ...GracefulExecutorServicesTerminationHandler.java | 19 +++--
 .../apache/pulsar/common/util/KeyManagerProxy.java | 18 ++--
 .../pulsar/common/util/ObjectMapperFactory.java    |  4 +-
 .../org/apache/pulsar/common/util/RateLimiter.java |  4 +-
 .../org/apache/pulsar/common/util/Runnables.java   |  7 +-
 .../apache/pulsar/common/util/SecurityUtility.java | 41 +++++-----
 .../apache/pulsar/common/util/ShutdownUtil.java    | 14 ++--
 ...ingleThreadNonConcurrentFixedRateScheduler.java |  6 +-
 .../pulsar/common/util/TrustManagerProxy.java      | 10 +--
 .../util/keystoretls/KeyStoreSSLContext.java       |  4 +-
 .../keystoretls/SSLContextValidatorEngine.java     |  4 +-
 .../pulsar/common/util/netty/DnsResolverUtil.java  |  9 +-
 .../pulsar/common/util/netty/EventLoopUtil.java    |  8 +-
 .../common/compression/CompressionCodecLZ4JNI.java |  6 +-
 .../compression/CompressionCodecSnappyJNI.java     |  6 +-
 .../apache/pulsar/common/nar/NarUnpackerTest.java  | 16 ++--
 .../stats/JvmDefaultGCMetricsLoggerTest.java       |  7 +-
 43 files changed, 311 insertions(+), 346 deletions(-)

diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt 
b/distribution/shell/src/assemble/LICENSE.bin.txt
index 6aa51417695..6f33219de16 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -391,6 +391,8 @@ The Apache Software License, Version 2.0
     - opentelemetry-api-incubator-1.56.0-alpha.jar
     - opentelemetry-common-1.56.0.jar
     - opentelemetry-context-1.56.0.jar
+  * Slog
+    - slog-0.9.5.jar
 
  * BookKeeper
     - bookkeeper-common-allocator-4.17.3.jar
diff --git a/pulsar-common/build.gradle.kts b/pulsar-common/build.gradle.kts
index cdfaaeb16c0..5388569ee13 100644
--- a/pulsar-common/build.gradle.kts
+++ b/pulsar-common/build.gradle.kts
@@ -88,7 +88,7 @@ dependencies {
     api(project(":pulsar-client-api"))
     api(project(":pulsar-client-admin-api"))
 
-    implementation(libs.slf4j.api)
+    implementation(libs.slog)
     implementation(libs.jackson.databind)
     implementation(libs.jackson.module.parameter.names)
     implementation(libs.jackson.datatype.jsr310)
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
index 4016dfb2a98..aefc9e42324 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
@@ -23,7 +23,7 @@ import static java.util.Objects.requireNonNull;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.EncodeData;
 import org.apache.pulsar.client.api.Schema;
@@ -35,7 +35,7 @@ import org.apache.pulsar.common.schema.SchemaType;
 /**
  * Util class for processing key/value schema info.
  */
-@Slf4j
+@CustomLog
 public final class KeyValueSchemaInfo {
 
     private static final Schema<SchemaInfo> SCHEMA_INFO_WRITER = new 
Schema<SchemaInfo>() {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java
index 4ad0732a62d..087180d125b 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java
@@ -28,8 +28,8 @@ import java.util.Objects;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import lombok.CustomLog;
 import lombok.experimental.UtilityClass;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
 import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
 import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
@@ -39,8 +39,8 @@ import org.apache.pulsar.common.util.ShutdownUtil;
 /**
  * Holder of a ByteBuf allocator.
  */
+@CustomLog
 @UtilityClass
-@Slf4j
 public class PulsarByteBufAllocator {
 
     public static final String PULSAR_ALLOCATOR_POOLED = 
"pulsar.allocator.pooled";
@@ -76,9 +76,7 @@ public class PulsarByteBufAllocator {
                 System.getProperty(PULSAR_ALLOCATOR_OUT_OF_MEMORY_POLICY, 
"FallbackToHeap"));
 
         final LeakDetectionPolicy leakDetectionPolicy = 
resolveLeakDetectionPolicyWithHighestLevel(System::getProperty);
-        if (log.isDebugEnabled()) {
-            log.debug("Is Pooled: {} -- Exit on OOM: {}", isPooled, 
isExitOnOutOfMemory);
-        }
+        log.debug().attr("isPooled", isPooled).attr("exitOnOOM", 
isExitOnOutOfMemory).log("Allocator configuration");
 
         ByteBufAllocatorBuilder builder = ByteBufAllocatorBuilder.create()
                 .leakDetectionPolicy(leakDetectionPolicy)
@@ -89,12 +87,12 @@ public class PulsarByteBufAllocator {
                         try {
                             c.accept(oomException);
                         } catch (Throwable t) {
-                            log.warn("Exception during OOM listener: {}", 
t.getMessage(), t);
+                            log.warn().exception(t).log("Exception during OOM 
listener");
                         }
                     });
 
                     if (isExitOnOutOfMemory) {
-                        log.info("Exiting JVM process for OOM error: {}", 
oomException.getMessage(), oomException);
+                        log.info().exception(oomException).log("Exiting JVM 
process for OOM error");
                         ShutdownUtil.triggerImmediateForcefulShutdown();
                     }
                 });
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
index 0e9aae4603d..57b21d59a60 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
@@ -25,8 +25,8 @@ import io.netty.buffer.ByteBuf;
 import io.netty.util.ReferenceCountUtil;
 import io.netty.util.concurrent.FastThreadLocal;
 import java.io.IOException;
+import lombok.CustomLog;
 import lombok.experimental.UtilityClass;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
 import org.apache.pulsar.common.compression.CompressionCodec;
@@ -37,8 +37,8 @@ import org.apache.pulsar.common.protocol.Commands;
 /**
  * Helper class to work with a raw Pulsar entry payload.
  */
+@CustomLog
 @UtilityClass
-@Slf4j
 public class MessageParser {
 
     private static final FastThreadLocal<SingleMessageMetadata> 
LOCAL_SINGLE_MESSAGE_METADATA = //
@@ -83,8 +83,11 @@ public class MessageParser {
             try {
                 Commands.parseMessageMetadata(payload, msgMetadata);
             } catch (Throwable t) {
-                log.warn("[{}] Failed to deserialize metadata for message 
{}:{} - Ignoring",
-                    topicName, ledgerId, entryId);
+                log.warn()
+                    .attr("topic", topicName)
+                    .attr("ledgerId", ledgerId)
+                    .attr("entryId", entryId)
+                    .log("Failed to deserialize metadata for message - 
Ignoring");
                 return;
             }
 
@@ -128,9 +131,13 @@ public class MessageParser {
             int checksum = readChecksum(headersAndPayload);
             int computedChecksum = computeChecksum(headersAndPayload);
             if (checksum != computedChecksum) {
-                log.error(
-                        "[{}] Checksum mismatch for message at {}:{}. Received 
checksum: 0x{}, Computed checksum: 0x{}",
-                        topic, ledgerId, entryId, Long.toHexString(checksum), 
Integer.toHexString(computedChecksum));
+                log.error()
+                        .attr("topic", topic)
+                        .attr("ledgerId", ledgerId)
+                        .attr("entryId", entryId)
+                        .attr("receivedChecksum", "0x" + 
Long.toHexString(checksum))
+                        .attr("computedChecksum", "0x" + 
Integer.toHexString(computedChecksum))
+                        .log("Checksum mismatch for message");
                 return false;
             }
         }
@@ -152,8 +159,12 @@ public class MessageParser {
         int payloadSize = payload.readableBytes();
         if (payloadSize > maxMessageSize) {
             // payload size is itself corrupted since it cannot be bigger than 
the MaxMessageSize
-            log.error("[{}] Got corrupted payload message size {} at {}:{}", 
topic, payloadSize,
-                    ledgerId, entryId);
+            log.error()
+                    .attr("topic", topic)
+                    .attr("payloadSize", payloadSize)
+                    .attr("ledgerId", ledgerId)
+                    .attr("entryId", entryId)
+                    .log("Got corrupted payload message size");
             return null;
         }
 
@@ -161,8 +172,13 @@ public class MessageParser {
             ByteBuf uncompressedPayload = codec.decode(payload, 
uncompressedSize);
             return uncompressedPayload;
         } catch (IOException e) {
-            log.error("[{}] Failed to decompress message with {} at {}:{} : 
{}", topic,
-                    msgMetadata.getCompression(), ledgerId, entryId, 
e.getMessage(), e);
+            log.error()
+                    .attr("topic", topic)
+                    .attr("compression", msgMetadata.getCompression())
+                    .attr("ledgerId", ledgerId)
+                    .attr("entryId", entryId)
+                    .exception(e)
+                    .log("Failed to decompress message");
             return null;
         }
     }
@@ -187,7 +203,7 @@ public class MessageParser {
                         ledgerId, entryId, i));
             }
         } catch (IOException e) {
-            log.warn("Unable to obtain messages in batch", e);
+            log.warn().exception(e).log("Unable to obtain messages in batch");
         }
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/BrokerEntryMetadataUtils.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/BrokerEntryMetadataUtils.java
index 00f8e861e44..0c9661d705a 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/BrokerEntryMetadataUtils.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/BrokerEntryMetadataUtils.java
@@ -22,17 +22,15 @@ import java.lang.reflect.InvocationTargetException;
 import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.Set;
+import lombok.CustomLog;
 import org.apache.pulsar.common.util.ClassLoaderUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * A tool class for loading BrokerEntryMetadataInterceptor classes.
  */
+@CustomLog
 public class BrokerEntryMetadataUtils<T> {
 
-    private static final Logger log = 
LoggerFactory.getLogger(BrokerEntryMetadataUtils.class);
-
     public static Set<BrokerEntryMetadataInterceptor> 
loadBrokerEntryMetadataInterceptors(
             Set<String> interceptorNames, ClassLoader classLoader) {
         Set<BrokerEntryMetadataInterceptor> interceptors = new HashSet<>();
@@ -46,12 +44,17 @@ public class BrokerEntryMetadataUtils<T> {
                         
interceptors.add(clz.getDeclaredConstructor().newInstance());
                     } catch (InstantiationException | IllegalAccessException
                             | InvocationTargetException | 
NoSuchMethodException e) {
-                        log.error("Create new BrokerEntryMetadataInterceptor 
instance for {} failed.",
-                                interceptorName, e);
+                        log.error()
+                                .attr("interceptorName", interceptorName)
+                                .exception(e)
+                                .log("Create new 
BrokerEntryMetadataInterceptor instance failed.");
                         throw new RuntimeException(e);
                     }
                 } catch (ClassNotFoundException e) {
-                    log.error("Load BrokerEntryMetadataInterceptor class for 
{} failed.", interceptorName, e);
+                    log.error()
+                            .attr("interceptorName", interceptorName)
+                            .exception(e)
+                            .log("Load BrokerEntryMetadataInterceptor class 
failed.");
                     throw new RuntimeException(e);
                 }
             }
@@ -71,12 +74,17 @@ public class BrokerEntryMetadataUtils<T> {
                         
interceptors.add(clz.getDeclaredConstructor().newInstance());
                     } catch (InstantiationException | IllegalAccessException
                         | InvocationTargetException | NoSuchMethodException e) 
{
-                        log.error("Create new instance for {} failed. 
Exception is {}",
-                            interceptorName, e);
+                        log.error()
+                            .attr("interceptorName", interceptorName)
+                            .exception(e)
+                            .log("Create new instance failed.");
                         throw new RuntimeException(e);
                     }
                 } catch (ClassNotFoundException e) {
-                    log.error("Load class for {} failed. Exception is {}", 
interceptorName, e);
+                    log.error()
+                        .attr("interceptorName", interceptorName)
+                        .exception(e)
+                        .log("Load class failed.");
                     throw new RuntimeException(e);
                 }
             }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java
index 38665ce4e1b..f231f95ced7 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java
@@ -31,15 +31,14 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
-import lombok.extern.slf4j.Slf4j;
-import org.slf4j.Logger;
+import lombok.CustomLog;
 
 /**
  * A utility class containing a few useful static methods to do typical IO
  * operations.
  *
  */
-@Slf4j
+@CustomLog
 public class FileUtils {
 
     public static final long MILLIS_BETWEEN_ATTEMPTS = 50L;
@@ -72,29 +71,7 @@ public class FileUtils {
         }
     }
 
-    /**
-     * Deletes the given file. If the given file exists but could not be 
deleted
-     * this will be printed as a warning to the given logger
-     *
-     * @param file to delete
-     * @param logger to notify
-     * @return true if deleted
-     */
-    public static boolean deleteFile(final File file, final Logger logger) {
-        return FileUtils.deleteFile(file, logger, 1);
-    }
-
-    /**
-     * Deletes the given file. If the given file exists but could not be 
deleted
-     * this will be printed as a warning to the given logger
-     *
-     * @param file to delete
-     * @param logger to notify
-     * @param attempts indicates how many times an attempt to delete should be
-     * made
-     * @return true if given file no longer exists
-     */
-    public static boolean deleteFile(final File file, final Logger logger, 
final int attempts) {
+    private static boolean deleteFile(final File file, final int attempts) {
         if (file == null) {
             return false;
         }
@@ -108,51 +85,17 @@ public class FileUtils {
                         FileUtils.sleepQuietly(MILLIS_BETWEEN_ATTEMPTS);
                     }
                 }
-                if (!isGone && logger != null) {
-                    logger.warn("File appears to exist but unable to delete 
file: " + file.getAbsolutePath());
+                if (!isGone) {
+                    log.warn().attr("file", file.getAbsolutePath())
+                            .log("File appears to exist but unable to delete 
file");
                 }
             }
         } catch (final Throwable t) {
-            if (logger != null) {
-                logger.warn("Unable to delete file: '" + 
file.getAbsolutePath() + "' due to " + t);
-            }
+            log.warn().attr("file", 
file.getAbsolutePath()).exception(t).log("Unable to delete file");
         }
         return isGone;
     }
 
-    /**
-     * Deletes all files (not directories..) in the given directory (non
-     * recursive) that match the given filename filter. If any file cannot be
-     * deleted then this is printed at warn to the given logger.
-     *
-     * @param directory to delete contents of
-     * @param filter if null then no filter is used
-     * @param logger to notify
-     * @throws IOException if abstract pathname does not denote a directory, or
-     * if an I/O error occurs
-     */
-    public static void deleteFilesInDirectory(final File directory, final 
FilenameFilter filter,
-                                              final Logger logger) throws 
IOException {
-        FileUtils.deleteFilesInDirectory(directory, filter, logger, false);
-    }
-
-    /**
-     * Deletes all files (not directories) in the given directory (recursive)
-     * that match the given filename filter. If any file cannot be deleted then
-     * this is printed at warn to the given logger.
-     *
-     * @param directory to delete contents of
-     * @param filter if null then no filter is used
-     * @param logger to notify
-     * @param recurse true if should recurse
-     * @throws IOException if abstract pathname does not denote a directory, or
-     * if an I/O error occurs
-     */
-    public static void deleteFilesInDirectory(final File directory, final 
FilenameFilter filter, final Logger logger,
-                                              final boolean recurse) throws 
IOException {
-        FileUtils.deleteFilesInDirectory(directory, filter, logger, recurse, 
false);
-    }
-
     /**
      * Deletes all files (not directories) in the given directory (recursive)
      * that match the given filename filter. If any file cannot be deleted then
@@ -160,7 +103,6 @@ public class FileUtils {
      *
      * @param directory to delete contents of
      * @param filter if null then no filter is used
-     * @param logger to notify
      * @param recurse will look for contents of sub directories.
      * @param deleteEmptyDirectories default is false; if true will delete
      * directories found that are empty
@@ -168,7 +110,7 @@ public class FileUtils {
      * if an I/O error occurs
      */
     public static void deleteFilesInDirectory(
-        final File directory, final FilenameFilter filter, final Logger logger,
+        final File directory, final FilenameFilter filter,
         final boolean recurse, final boolean deleteEmptyDirectories) throws 
IOException {
         // ensure the specified directory is actually a directory and that it 
exists
         if (null != directory && directory.isDirectory()) {
@@ -180,13 +122,13 @@ public class FileUtils {
             for (File ingestFile : ingestFiles) {
                 boolean process = (filter == null) ? true : 
filter.accept(directory, ingestFile.getName());
                 if (ingestFile.isFile() && process) {
-                    FileUtils.deleteFile(ingestFile, logger, 3);
+                    FileUtils.deleteFile(ingestFile, 3);
                 }
                 if (ingestFile.isDirectory() && recurse) {
-                    FileUtils.deleteFilesInDirectory(ingestFile, filter, 
logger, recurse, deleteEmptyDirectories);
+                    FileUtils.deleteFilesInDirectory(ingestFile, filter, 
recurse, deleteEmptyDirectories);
                     String[] ingestFileList = ingestFile.list();
                     if (deleteEmptyDirectories && ingestFileList != null && 
ingestFileList.length == 0) {
-                        FileUtils.deleteFile(ingestFile, logger, 3);
+                        FileUtils.deleteFile(ingestFile, 3);
                     }
                 }
             }
@@ -212,7 +154,7 @@ public class FileUtils {
             FileUtils.deleteFiles(Arrays.asList(list), recurse);
         }
         //now delete the file itself regardless of whether it is plain file or 
a directory
-        if (!FileUtils.deleteFile(file, null, 5)) {
+        if (!FileUtils.deleteFile(file, 5)) {
             throw new IOException("Unable to delete " + 
file.getAbsolutePath());
         }
     }
@@ -229,14 +171,18 @@ public class FileUtils {
         try (ZipFile zipFile = new ZipFile(jarFile);) {
             ZipEntry entry = zipFile.getEntry("META-INF/bundled-dependencies");
             if (entry == null || !entry.isDirectory()) {
-                log.info("Jar file {} does not contain 
META-INF/bundled-dependencies, it is not a NAR file", jarFile);
+                log.info().attr("jarFile", jarFile)
+                        .log("Jar file does not contain 
META-INF/bundled-dependencies,"
+                                + " it is not a NAR file");
                 return false;
             } else {
-                log.info("Jar file {} contains META-INF/bundled-dependencies, 
it may be a NAR file", jarFile);
+                log.info().attr("jarFile", jarFile)
+                        .log("Jar file contains META-INF/bundled-dependencies,"
+                                + " it may be a NAR file");
                 return true;
             }
         } catch (IOException err) {
-            log.info("Cannot safely detect if {} is a NAR archive", jarFile, 
err);
+            log.info().attr("jarFile", jarFile).exception(err).log("Cannot 
safely detect if file is a NAR archive");
             return true;
         }
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
index 48c2db725d8..b6bd34e1d9e 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
@@ -40,7 +40,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
 
 /**
  * <p>
@@ -122,7 +122,7 @@ import lombok.extern.slf4j.Slf4j;
  * NAR.
  * </p>
  */
-@Slf4j
+@CustomLog
 public class NarClassLoader extends URLClassLoader {
 
     private static final FileFilter JAR_FILTER = pathname -> {
@@ -180,9 +180,7 @@ public class NarClassLoader extends URLClassLoader {
             }
         }
 
-        if (log.isDebugEnabled()) {
-            log.debug("Created class loader with paths: {}", 
Arrays.toString(getURLs()));
-        }
+        log.debug().attr("paths", () -> 
Arrays.toString(getURLs())).log("Created class loader with paths");
     }
 
     public File getWorkingDirectory() {
@@ -236,7 +234,7 @@ public class NarClassLoader extends URLClassLoader {
             try {
                 addURL(f.toURI().toURL());
             } catch (IOException e) {
-                log.error("Failed to add entry to classpath: {}", f, e);
+                log.error().attr("entry", f).exception(e).log("Failed to add 
entry to classpath");
             }
         });
     }
@@ -246,7 +244,7 @@ public class NarClassLoader extends URLClassLoader {
         classPathEntries.add(root);
         File dependencies = new File(root, "META-INF/bundled-dependencies");
         if (!dependencies.isDirectory()) {
-            log.warn("{} does not contain META-INF/bundled-dependencies!", 
root);
+            log.warn().attr("root", root).log("does not contain 
META-INF/bundled-dependencies!");
         }
         classPathEntries.add(dependencies);
         if (dependencies.isDirectory()) {
@@ -263,7 +261,7 @@ public class NarClassLoader extends URLClassLoader {
     protected String findLibrary(final String libname) {
         File dependencies = new File(narWorkingDirectory, 
"META-INF/bundled-dependencies");
         if (!dependencies.isDirectory()) {
-            log.warn("{} does not contain META-INF/bundled-dependencies!", 
narWorkingDirectory);
+            log.warn().attr("directory", narWorkingDirectory).log("does not 
contain META-INF/bundled-dependencies!");
         }
 
         final File nativeDir = new File(dependencies, "native");
@@ -290,7 +288,7 @@ public class NarClassLoader extends URLClassLoader {
     @Override
     protected Class<?> loadClass(String name, boolean resolve) throws 
ClassNotFoundException {
         if (closed.get()) {
-            log.warn("Loading class {} from a closed classloader ({})", name, 
this);
+            log.warn().attr("class", name).attr("classloader", 
this).log("Loading class from a closed classloader");
         }
         return super.loadClass(name, resolve);
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
index b30aa37ff16..4da17e31834 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
@@ -43,12 +43,12 @@ import java.util.Enumeration;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipFile;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
 
 /**
  * Helper class to unpack NARs.
  */
-@Slf4j
+@CustomLog
 public class NarUnpacker {
     private static final ConcurrentHashMap<String, Object> 
CURRENT_JVM_FILE_LOCKS = new ConcurrentHashMap<>();
 
@@ -73,7 +73,7 @@ public class NarUnpacker {
         File parentDirectory = new File(baseWorkingDirectory, nar.getName() + 
"-unpacked");
         if (!parentDirectory.exists()) {
             if (parentDirectory.mkdirs()) {
-                log.info("Created directory {}", parentDirectory);
+                log.info().attr("directory", parentDirectory).log("Created 
directory");
             } else if (!parentDirectory.exists()) {
                 throw new IOException("Cannot create " + parentDirectory);
             }
@@ -98,18 +98,23 @@ public class NarUnpacker {
                         throw new IOException("Cannot create " + 
narExtractionTempDirectory);
                     }
                     try {
-                        log.info("Extracting {} to {}", nar, 
narExtractionTempDirectory);
+                        log.info().attr("nar", nar).attr("destination", 
narExtractionTempDirectory).log("Extracting");
                         if (extractCallback != null) {
                             extractCallback.run();
                         }
                         unpack(nar, narExtractionTempDirectory);
                     } catch (IOException e) {
-                        log.error("There was a problem extracting the nar 
file. Deleting {} to clean up state.",
-                                narExtractionTempDirectory, e);
+                        log.error()
+                                .attr("directory", narExtractionTempDirectory)
+                                .exception(e)
+                                .log("There was a problem extracting the nar 
file. Deleting to clean up state.");
                         try {
                             FileUtils.deleteFile(narExtractionTempDirectory, 
true);
                         } catch (IOException e2) {
-                            log.error("Failed to delete temporary directory 
{}", narExtractionTempDirectory, e2);
+                            log.error()
+                                    .attr("directory", 
narExtractionTempDirectory)
+                                    .exception(e2)
+                                    .log("Failed to delete temporary 
directory");
                         }
                         throw e;
                     }
@@ -138,7 +143,7 @@ public class NarUnpacker {
                 String name = zipEntry.getName();
                 Path targetFilePath = 
workingDirectoryPath.resolve(name).normalize();
                 if (!targetFilePath.startsWith(workingDirectoryPath)) {
-                    log.error("Invalid zip file with entry '{}'", name);
+                    log.error().attr("entry", name).log("Invalid zip file with 
entry");
                     throw new IOException("Invalid zip file. Aborting 
unpacking.");
                 }
                 File f = targetFilePath.toFile();
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterDataImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterDataImpl.java
index b887fe0a586..af82da4b790 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterDataImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterDataImpl.java
@@ -23,9 +23,9 @@ import io.swagger.annotations.ApiModelProperty;
 import java.util.LinkedHashSet;
 import java.util.Objects;
 import lombok.AllArgsConstructor;
+import lombok.CustomLog;
 import lombok.Data;
 import lombok.NoArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.ProxyProtocol;
 import org.apache.pulsar.common.util.DefaultPulsarSslFactory;
@@ -41,7 +41,7 @@ import org.apache.pulsar.common.util.URIPreconditions;
 @Data
 @AllArgsConstructor
 @NoArgsConstructor
-@Slf4j
+@CustomLog
 public final class ClusterDataImpl implements  ClusterData, Cloneable {
     @ApiModelProperty(
             name = "serviceUrl",
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
index 4e365b1dafc..f1c687bb97d 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
@@ -34,15 +34,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
+import lombok.CustomLog;
 import lombok.Data;
 import lombok.NoArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 
 /**
  * Definition of the offload policies.
  */
-@Slf4j
+@CustomLog
 @Data
 @NoArgsConstructor
 public class OffloadPoliciesImpl implements Serializable, OffloadPolicies {
@@ -452,7 +452,7 @@ public class OffloadPoliciesImpl implements Serializable, 
OffloadPolicies {
                 return offloadPolicies;
             }
         } catch (Exception e) {
-            log.error("Failed to merge configuration.", e);
+            log.error().exception(e).log("Failed to merge configuration.");
             return null;
         }
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 275b6160061..8c6289b944d 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -40,8 +40,8 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.function.Consumer;
 import java.util.function.ToLongFunction;
+import lombok.CustomLog;
 import lombok.experimental.UtilityClass;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.pulsar.PulsarVersion;
@@ -115,7 +115,7 @@ import 
org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.apache.pulsar.common.util.collections.ConcurrentBitSet;
 
 @UtilityClass
-@Slf4j
+@CustomLog
 @SuppressWarnings("checkstyle:JavadocType")
 public class Commands {
 
@@ -2064,7 +2064,8 @@ public class Commands {
             MessageMetadata metadata = 
parseMessageMetadata(metadataAndPayload);
             return metadata;
         } catch (Throwable t) {
-            log.error("[{}] [{}] Failed to parse message metadata", 
subscription, consumerId, t);
+            log.error().attr("subscription", subscription).attr("consumerId", 
consumerId).exception(t)
+                    .log("Failed to parse message metadata");
             return null;
         } finally {
             metadataAndPayload.readerIndex(readerIdx);
@@ -2094,7 +2095,8 @@ public class Commands {
         try {
             peekMessageMetadata(metadataAndPayload, metadata);
         } catch (Throwable t) {
-            log.error("[{}] [{}] Failed to parse message metadata", 
subscription, consumerId, t);
+            log.error().attr("subscription", subscription).attr("consumerId", 
consumerId).exception(t)
+                    .log("Failed to parse message metadata");
             return null;
         }
         return metadata;
@@ -2108,7 +2110,8 @@ public class Commands {
             MessageMetadata metadata = 
parseMessageMetadata(metadataAndPayload);
             return resolveStickyKey(metadata);
         } catch (Throwable t) {
-            log.error("[{}] [{}] Failed to peek sticky key from the message 
metadata", topic, subscription, t);
+            log.error().attr("topic", topic).attr("subscription", 
subscription).exception(t)
+                    .log("Failed to peek sticky key from the message 
metadata");
             return NONE_KEY;
         } finally {
             metadataAndPayload.readerIndex(readerIdx);
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/OptionalProxyProtocolDecoder.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/OptionalProxyProtocolDecoder.java
index b4e15f8cd1d..4bc7f256fda 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/OptionalProxyProtocolDecoder.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/OptionalProxyProtocolDecoder.java
@@ -26,13 +26,13 @@ import io.netty.handler.codec.ProtocolDetectionResult;
 import io.netty.handler.codec.ProtocolDetectionState;
 import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
 import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
 
 /**
  * Decoder that added whether a new connection is prefixed with the 
ProxyProtocol.
  * More about the ProxyProtocol see: 
http://www.haproxy.org/download/1.8/doc/proxy-protocol.txt.
  */
-@Slf4j
+@CustomLog
 public class OptionalProxyProtocolDecoder extends ChannelInboundHandlerAdapter 
{
 
     public static final String NAME = "optional-proxy-protocol-decoder";
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index 9017805c75b..595d75e8801 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -26,6 +26,7 @@ import io.netty.channel.ChannelOutboundInvoker;
 import io.netty.handler.codec.haproxy.HAProxyMessage;
 import io.netty.handler.ssl.SslCloseCompletionEvent;
 import io.netty.handler.ssl.SslHandshakeCompletionEvent;
+import lombok.CustomLog;
 import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.CommandAck;
 import org.apache.pulsar.common.api.proto.CommandAckResponse;
@@ -88,8 +89,6 @@ import 
org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate;
 import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.intercept.InterceptException;
 import org.apache.pulsar.common.util.netty.NettyChannelUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Basic implementation of the channel handler to process inbound Pulsar data.
@@ -100,6 +99,7 @@ import org.slf4j.LoggerFactory;
  * after the method returns.</b> If you need to pass an instance of the 
command instance to another thread or retain a
  * reference to it after the handle* method completes, you must make a deep 
copy of the command instance.
  */
+@CustomLog
 public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter {
 
     // From the proxy protocol. If present, it means the client is connected 
via a reverse proxy.
@@ -123,9 +123,7 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
             int cmdSize = (int) buffer.readUnsignedInt();
             cmd.parseFrom(buffer, cmdSize);
 
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Received cmd {}", ctx.channel(), 
cmd.getType());
-            }
+            log.debug().attr("channel", ctx.channel()).attr("cmdType", 
cmd.getType()).log("Received cmd");
             messageReceived(cmd);
 
             switch (cmd.getType()) {
@@ -746,8 +744,6 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
         throw new UnsupportedOperationException();
     }
 
-    private static final Logger log = 
LoggerFactory.getLogger(PulsarDecoder.class);
-
     private void writeAndFlush(ChannelOutboundInvoker ctx, ByteBuf cmd) {
         NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, cmd);
     }
@@ -757,7 +753,8 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
             // log handshake failures
             SslHandshakeCompletionEvent sslHandshakeCompletionEvent = 
(SslHandshakeCompletionEvent) evt;
             if (!sslHandshakeCompletionEvent.isSuccess()) {
-                log.warn("[{}] TLS handshake failed. {}", ctx.channel(), 
sslHandshakeCompletionEvent);
+                log.warn().attr("channel", ctx.channel()).attr("event", 
sslHandshakeCompletionEvent)
+                        .log("TLS handshake failed");
             }
         } else if (evt instanceof SslCloseCompletionEvent) {
             // handle TLS close_notify event and immediately close the channel
@@ -765,9 +762,7 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
             // See https://datatracker.ietf.org/doc/html/rfc8446#section-6.1 
for more details
             SslCloseCompletionEvent sslCloseCompletionEvent = 
(SslCloseCompletionEvent) evt;
             if (sslCloseCompletionEvent.isSuccess() && 
ctx.channel().isActive()) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Received a TLS close_notify, closing the 
channel.", ctx.channel());
-                }
+                log.debug().attr("channel", ctx.channel()).log("Received a TLS 
close_notify, closing the channel");
                 ctx.close();
             }
         }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
index e8010ea1a51..eecff3686a7 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
@@ -25,13 +25,12 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.util.concurrent.ScheduledFuture;
 import java.net.SocketAddress;
 import java.util.concurrent.TimeUnit;
+import lombok.CustomLog;
 import lombok.Setter;
 import org.apache.pulsar.common.api.proto.BaseCommand;
 import org.apache.pulsar.common.api.proto.CommandPing;
 import org.apache.pulsar.common.api.proto.CommandPong;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Implementation of the channel handler to process inbound Pulsar data.
@@ -39,6 +38,7 @@ import org.slf4j.LoggerFactory;
  * Please see {@link org.apache.pulsar.common.protocol.PulsarDecoder} javadoc 
for important details about handle* method
  * parameter instance lifecycle.
  */
+@CustomLog
 public abstract class PulsarHandler extends PulsarDecoder {
     @VisibleForTesting
     @Setter
@@ -71,9 +71,8 @@ public abstract class PulsarHandler extends PulsarDecoder {
         this.remoteAddress = ctx.channel().remoteAddress();
         this.ctx = ctx;
 
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] Scheduling keep-alive task every {} s", 
this.toString(), keepAliveIntervalSeconds);
-        }
+        log.debug().attr("handler", this).attr("intervalSeconds", 
keepAliveIntervalSeconds)
+                .log("Scheduling keep-alive task");
         if (keepAliveIntervalSeconds > 0) {
             this.keepAliveTask = ctx.executor()
                     
.scheduleAtFixedRate(catchingAndLoggingThrowables(this::handleKeepAliveTimeout),
@@ -89,14 +88,12 @@ public abstract class PulsarHandler extends PulsarDecoder {
     @Override
     protected final void handlePing(CommandPing ping) {
         // Immediately reply success to ping requests
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] Replying back to ping message", this.toString());
-        }
+        log.debug().attr("handler", this).log("Replying back to ping message");
         ctx.writeAndFlush(Commands.newPong())
                 .addListener(future -> {
                     if (!future.isSuccess()) {
-                        log.warn("[{}] Forcing connection to close since 
cannot send a pong message.",
-                                toString(), future.cause());
+                        log.warn().attr("handler", 
toString()).exception(future.cause())
+                                .log("Forcing connection to close since cannot 
send a pong message");
                         ctx.close();
                     }
                 });
@@ -112,25 +109,22 @@ public abstract class PulsarHandler extends PulsarDecoder 
{
         }
 
         if (!isHandshakeCompleted()) {
-            log.warn("[{}] Pulsar Handshake was not completed within timeout, 
closing connection", this.toString());
+            log.warn().attr("handler", this)
+                    .log("Pulsar Handshake was not completed within timeout, 
closing connection");
             ctx.close();
         } else if (waitingForPingResponse && 
ctx.channel().config().isAutoRead()) {
             // We were waiting for a response and another keep-alive just 
completed.
             // If auto-read was disabled, it means we stopped reading from the 
connection, so we might receive the Ping
             // response later and thus not enforce the strict timeout here.
-            log.warn("[{}] Forcing connection to close after keep-alive 
timeout", this.toString());
+            log.warn().attr("handler", this).log("Forcing connection to close 
after keep-alive timeout");
             ctx.close();
         } else if (getRemoteEndpointProtocolVersion() >= 
ProtocolVersion.v1.getValue()) {
             // Send keep alive probe to peer only if it supports the ping/pong 
commands, added in v1
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Sending ping message", this.toString());
-            }
+            log.debug().attr("handler", this).log("Sending ping message");
             waitingForPingResponse = true;
             sendPing();
         } else {
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] Peer doesn't support keep-alive", 
this.toString());
-            }
+            log.debug().attr("handler", this).log("Peer doesn't support 
keep-alive");
         }
     }
 
@@ -138,8 +132,8 @@ public abstract class PulsarHandler extends PulsarDecoder {
         return ctx.writeAndFlush(Commands.newPing())
                 .addListener(future -> {
                     if (!future.isSuccess()) {
-                        log.warn("[{}] Forcing connection to close since 
cannot send a ping message.",
-                                this.toString(), future.cause());
+                        log.warn().attr("handler", 
this).exception(future.cause())
+                                .log("Forcing connection to close since cannot 
send a ping message");
                         ctx.close();
                     }
                 });
@@ -172,5 +166,4 @@ public abstract class PulsarHandler extends PulsarDecoder {
         }
     }
 
-    private static final Logger log = 
LoggerFactory.getLogger(PulsarHandler.class);
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/JAASCredentialsContainer.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/JAASCredentialsContainer.java
index 025ab415a80..b23fe82eac5 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/JAASCredentialsContainer.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/JAASCredentialsContainer.java
@@ -28,14 +28,14 @@ import javax.security.auth.login.AppConfigurationEntry;
 import javax.security.auth.login.Configuration;
 import javax.security.auth.login.LoginContext;
 import javax.security.auth.login.LoginException;
+import lombok.CustomLog;
 import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
 
 /**
  * JAAS Credentials Container.
  * This is added for support Kerberos authentication.
  */
-@Slf4j
+@CustomLog
 @Getter
 public class JAASCredentialsContainer implements Closeable {
     private Subject subject;
@@ -63,7 +63,7 @@ public class JAASCredentialsContainer implements Closeable {
                 + "Please check your java.security.login.auth.config (="
                 + System.getProperty("java.security.login.auth.config")
                 + ") for section header: " + this.loginContextName;
-            log.error("No JAAS Configuration section header found for Client: 
{}", errorMessage);
+            log.error().attr("details", errorMessage).log("No JAAS 
Configuration section header found for Client");
             throw new LoginException(errorMessage);
         }
         LoginContext loginContext = new LoginContext(loginContextName, 
callbackHandler);
@@ -96,9 +96,7 @@ public class JAASCredentialsContainer implements Closeable {
                 ticketRefreshThread.join(10000);
             } catch (InterruptedException exit) {
                 Thread.currentThread().interrupt();
-                if (log.isDebugEnabled()) {
-                    log.debug("interrupted while waiting for TGT refresh 
thread to stop", exit);
-                }
+                log.debug().exception(exit).log("interrupted while waiting for 
TGT refresh thread to stop");
             }
         }
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java
index 6a0a7448f75..ca9329b979b 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java
@@ -25,12 +25,12 @@ import javax.security.auth.kerberos.KerberosPrincipal;
 import javax.security.auth.kerberos.KerberosTicket;
 import javax.security.auth.login.LoginContext;
 import javax.security.auth.login.LoginException;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
 
 /**
  * TGT Refresh Thread. Copied from Apache ZooKeeper TGT refresh logic.
  */
-@Slf4j
+@CustomLog
 public class TGTRefreshThread extends Thread {
 
     private static final Random rng = new Random();
@@ -59,8 +59,9 @@ public class TGTRefreshThread extends Thread {
         for (KerberosTicket ticket : tickets) {
             KerberosPrincipal server = ticket.getServer();
             if (server.getName().equals("krbtgt/" + server.getRealm() + "@" + 
server.getRealm())) {
-                log.info("Client principal is \"" + 
ticket.getClient().getName() + "\".");
-                log.info("Server principal is \"" + 
ticket.getServer().getName() + "\".");
+                log.info().attr("clientPrincipal", 
ticket.getClient().getName())
+                        .attr("serverPrincipal", ticket.getServer().getName())
+                        .log("Found TGT ticket");
                 return ticket;
             }
         }
@@ -82,8 +83,8 @@ public class TGTRefreshThread extends Thread {
     private long getRefreshTime(KerberosTicket tgt) {
         long start = tgt.getStartTime().getTime();
         long expires = tgt.getEndTime().getTime();
-        log.info("TGT valid starting at:        {}", 
tgt.getStartTime().toString());
-        log.info("TGT expires:                  {}", 
tgt.getEndTime().toString());
+        log.info().attr("startTime", tgt.getStartTime().toString())
+                .attr("endTime", tgt.getEndTime().toString()).log("TGT 
validity period");
         long proposedRefresh = start
             + (long) ((expires - start) * (TICKET_RENEW_WINDOW + 
(TICKET_RENEW_JITTER * rng.nextDouble())));
         if (proposedRefresh > expires) {
@@ -106,20 +107,21 @@ public class TGTRefreshThread extends Thread {
             if (tgt == null) {
                 nextRefresh = now + MIN_TIME_BEFORE_RELOGIN;
                 nextRefreshDate = new Date(nextRefresh);
-                log.warn("No TGT found: will try again at {}", 
nextRefreshDate);
+                log.warn().attr("nextRetry", nextRefreshDate).log("No TGT 
found: will try again");
             } else {
                 nextRefresh = getRefreshTime(tgt);
                 long expiry = tgt.getEndTime().getTime();
                 Date expiryDate = new Date(expiry);
                 if ((container.isUsingTicketCache()) && 
(tgt.getEndTime().equals(tgt.getRenewTill()))) {
-                    Object[] logPayload = {expiryDate, 
container.getPrincipal(), container.getPrincipal()};
-                    log.error("The TGT cannot be renewed beyond the next 
expiry date: {}."
-                        + "This process will not be able to authenticate new 
SASL connections after that "
-                        + "time (for example, it will not be authenticate a 
new connection with a Broker "
-                        + ").  Ask your system administrator to either 
increase the "
-                        + "'renew until' time by doing : 'modprinc 
-maxrenewlife {}' within "
-                        + "kadmin, or instead, to generate a keytab for {}. 
Because the TGT's "
-                        + "expiry cannot be further extended by refreshing, 
exiting refresh thread now.", logPayload);
+                    log.error()
+                        .attr("expiryDate", expiryDate)
+                        .attr("principal", container.getPrincipal())
+                        .log("The TGT cannot be renewed beyond the next expiry 
date."
+                                + " This process will not be able to 
authenticate new SASL"
+                                + " connections after that time. Ask your 
system administrator"
+                                + " to either increase the 'renew until' time 
by doing"
+                                + " 'modprinc -maxrenewlife' within kadmin, or 
instead,"
+                                + " to generate a keytab. Exiting refresh 
thread now.");
                     return;
                 }
                 // determine how long to sleep from looking at ticket's expiry.
@@ -134,20 +136,23 @@ public class TGTRefreshThread extends Thread {
                         // next scheduled refresh is sooner than (now + 
MIN_TIME_BEFORE_LOGIN).
                         Date until = new Date(nextRefresh);
                         Date newuntil = new Date(now + 
MIN_TIME_BEFORE_RELOGIN);
-                        Object[] logPayload = {until, newuntil, 
MIN_TIME_BEFORE_RELOGIN / 1000};
-                        log.warn("TGT refresh thread time adjusted from : {} 
to : {} since "
-                            + "the former is sooner than the minimum refresh 
interval ("
-                            + "{} seconds) from now.", logPayload);
+                        log.warn()
+                            .attr("from", until)
+                            .attr("to", newuntil)
+                            .attr("minIntervalSeconds", 
MIN_TIME_BEFORE_RELOGIN / 1000)
+                            .log("TGT refresh thread time adjusted since the 
former is sooner"
+                                    + " than the minimum refresh interval from 
now.");
                     }
                     nextRefresh = Math.max(nextRefresh, now + 
MIN_TIME_BEFORE_RELOGIN);
                 }
                 nextRefreshDate = new Date(nextRefresh);
                 if (nextRefresh > expiry) {
-                    Object[] logPayload = {nextRefreshDate, expiryDate};
-                    log.error(
-                        "next refresh: {} is later than expiry {}." + " This 
may indicate a clock skew problem."
-                        + "Check that this host and the KDC's " + "hosts' 
clocks are in sync. Exiting refresh thread.",
-                        logPayload);
+                    log.error()
+                        .attr("nextRefresh", nextRefreshDate)
+                        .attr("expiry", expiryDate)
+                        .log("next refresh is later than expiry. This may 
indicate a clock"
+                                + " skew problem. Check that this host and the 
KDC's hosts'"
+                                + " clocks are in sync. Exiting refresh 
thread.");
                     return;
                 }
             }
@@ -155,7 +160,7 @@ public class TGTRefreshThread extends Thread {
                 log.info("refreshing now because expiry is before next 
scheduled refresh time.");
             } else if (now < nextRefresh) {
                 Date until = new Date(nextRefresh);
-                log.info("TGT refresh sleeping until: {}", until.toString());
+                log.info().attr("until", until.toString()).log("TGT refresh 
sleeping");
                 try {
                     Thread.sleep(nextRefresh - now);
                 } catch (InterruptedException ie) {
@@ -164,10 +169,12 @@ public class TGTRefreshThread extends Thread {
                     break;
                 }
             } else {
-                log.error("nextRefresh:{} is in the past: exiting refresh 
thread. Check"
-                    + " clock sync between this host and KDC - (KDC's clock is 
likely ahead of this host)."
-                    + " Manual intervention will be required for this client 
to successfully authenticate."
-                    + " Exiting refresh thread.", nextRefreshDate);
+                log.error()
+                    .attr("nextRefresh", nextRefreshDate)
+                    .log("nextRefresh is in the past: exiting refresh thread. 
Check clock"
+                            + " sync between this host and KDC - (KDC's clock 
is likely ahead"
+                            + " of this host). Manual intervention will be 
required for this"
+                            + " client to successfully authenticate.");
                 break;
             }
             if (container.isUsingTicketCache()) {
@@ -177,7 +184,7 @@ public class TGTRefreshThread extends Thread {
                 int retry = 1;
                 while (retry >= 0) {
                     try {
-                        log.info("running ticket cache refresh command: {} 
{}", cmd, kinitArgs);
+                        log.info().attr("cmd", cmd).attr("args", 
kinitArgs).log("running ticket cache refresh command");
 
                         ProcessBuilder processBuilder = new ProcessBuilder();
                         processBuilder.command("bash", "-c", cmd, kinitArgs);
@@ -194,9 +201,12 @@ public class TGTRefreshThread extends Thread {
                                 return;
                             }
                         } else {
-                            Object[] logPayload = {cmd, kinitArgs, 
e.toString(), e};
-                            log.warn("Could not renew TGT due to problem 
running shell command: '{}"
-                                + " {}'; exception was:{}. Exiting refresh 
thread.", logPayload);
+                            log.warn()
+                                .attr("cmd", cmd)
+                                .attr("args", kinitArgs)
+                                .exception(e)
+                                .log("Could not renew TGT due to problem 
running shell"
+                                        + " command. Exiting refresh thread.");
                             return;
                         }
                     }
@@ -216,16 +226,19 @@ public class TGTRefreshThread extends Thread {
                                 Thread.sleep(10 * 1000);
                             } catch (InterruptedException e) {
                                 Thread.currentThread().interrupt();
-                                log.error("Interrupted during login retry 
after LoginException:", le);
+                                log.error().exception(le).log("Interrupted 
during login retry after LoginException");
                                 throw le;
                             }
                         } else {
-                            log.error("Could not refresh TGT for principal: 
{}.", container.getPrincipal(), le);
+                            log.error()
+                            .attr("principal", container.getPrincipal())
+                            .exception(le)
+                            .log("Could not refresh TGT for principal");
                         }
                     }
                 }
             } catch (LoginException le) {
-                log.error("Failed to refresh TGT: refresh thread exiting 
now.", le);
+                log.error().exception(le).log("Failed to refresh TGT: refresh 
thread exiting now.");
                 break;
             }
         }
@@ -244,7 +257,7 @@ public class TGTRefreshThread extends Thread {
         if (!hasSufficientTimeElapsed()) {
             return;
         }
-        log.info("Initiating logout for {}", container.getPrincipal());
+        log.info().attr("principal", container.getPrincipal()).log("Initiating 
logout");
         synchronized (this) {
             //clear up the kerberos state. But the tokens are not cleared! As 
per
             //the Java kerberos login module code, only the kerberos 
credentials
@@ -253,7 +266,7 @@ public class TGTRefreshThread extends Thread {
             //login and also update the subject field of this instance to
             //have the new credentials (pass it to the LoginContext 
constructor)
             login = new LoginContext(container.getLoginContextName(), 
container.getSubject());
-            log.info("Initiating re-login for {}", container.getPrincipal());
+            log.info().attr("principal", 
container.getPrincipal()).log("Initiating re-login");
             login.login();
             container.setLoginContext(login);
         }
@@ -262,8 +275,10 @@ public class TGTRefreshThread extends Thread {
     private boolean hasSufficientTimeElapsed() {
         long now = System.currentTimeMillis();
         if (now - getLastLogin() < MIN_TIME_BEFORE_RELOGIN) {
-            log.warn("Not attempting to re-login since the last re-login was "
-                + "attempted less than {} seconds before.", 
MIN_TIME_BEFORE_RELOGIN / 1000);
+            log.warn()
+                .attr("minIntervalSeconds", MIN_TIME_BEFORE_RELOGIN / 1000)
+                .log("Not attempting to re-login since the last re-login was"
+                        + " attempted less than the minimum interval.");
             return false;
         }
         // register most recent relogin attempt
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java
index 03873da8f33..498b5d76bc1 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java
@@ -24,15 +24,11 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.BooleanSupplier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Implementation of AsyncDualMemoryLimiter with separate limits for heap and 
direct memory.
  */
 public class AsyncDualMemoryLimiterImpl implements AsyncDualMemoryLimiter, 
AutoCloseable {
-    private static final Logger log = 
LoggerFactory.getLogger(AsyncDualMemoryLimiterImpl.class);
-
     private final ScheduledExecutorService executor;
     private final boolean shutdownExecutor;
     private final AsyncSemaphoreImpl heapLimiter;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java
index eb786e474fa..5b4b885044e 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java
@@ -33,15 +33,11 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.function.BooleanSupplier;
 import java.util.function.LongConsumer;
 import org.apache.pulsar.common.util.Runnables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Implementation of AsyncSemaphore with timeout and queue size limits.
  */
 public class AsyncSemaphoreImpl implements AsyncSemaphore, AutoCloseable {
-    private static final Logger log = 
LoggerFactory.getLogger(AsyncSemaphoreImpl.class);
-
     private final AtomicLong availablePermits;
     private final Queue<PendingRequest> queue;
     private final long maxPermits;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmDefaultGCMetricsLogger.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmDefaultGCMetricsLogger.java
index db840c487f3..357e1eecf18 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmDefaultGCMetricsLogger.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmDefaultGCMetricsLogger.java
@@ -24,15 +24,13 @@ import java.lang.reflect.Method;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import lombok.CustomLog;
 import lombok.SneakyThrows;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+@CustomLog
 @SuppressWarnings({"checkstyle:JavadocType"})
 public class JvmDefaultGCMetricsLogger implements JvmGCMetricsLogger {
 
-    private static final Logger log = 
LoggerFactory.getLogger(JvmDefaultGCMetricsLogger.class);
-
     private volatile long accumulatedFullGcCount = 0;
     private volatile long currentFullGcCount = 0;
     private volatile long accumulatedFullGcTime = 0;
@@ -58,7 +56,7 @@ public class JvmDefaultGCMetricsLogger implements 
JvmGCMetricsLogger {
             getTotalSafepointTimeHandle.invoke(runtime);
             getSafepointCountHandle.invoke(runtime);
         } catch (Throwable e) {
-            log.warn("Failed to get Runtime bean", e);
+            log.warn().exception(e).log("Failed to get Runtime bean");
         }
     }
 
@@ -129,7 +127,7 @@ public class JvmDefaultGCMetricsLogger implements 
JvmGCMetricsLogger {
             accumulatedFullGcCount = newSafePointCount;
 
         } catch (Exception e) {
-            log.error("Failed to collect GC stats: {}", e.getMessage());
+            log.error().exceptionMessage(e).log("Failed to collect GC stats");
         }
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmG1GCMetricsLogger.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmG1GCMetricsLogger.java
index 203fe59eaf2..4b7c53e9675 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmG1GCMetricsLogger.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmG1GCMetricsLogger.java
@@ -22,12 +22,12 @@ import java.lang.management.ManagementFactory;
 import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.CustomLog;
 
 /**
  * Logger for the JVM G1 GC metrics.
  */
+@CustomLog
 public class JvmG1GCMetricsLogger implements JvmGCMetricsLogger {
 
     private volatile long accumulatedYoungGcCount = 0;
@@ -83,10 +83,9 @@ public class JvmG1GCMetricsLogger implements 
JvmGCMetricsLogger {
             accumulatedOldGcCount = newValueOldGcCount;
             accumulatedOldGcTime = newValueOldGcTime;
         } catch (Exception e) {
-            log.error("Failed to collect GC stats: {}", e.getMessage());
+            log.error().exceptionMessage(e).log("Failed to collect GC stats");
         }
 
     }
 
-    private static final Logger log = 
LoggerFactory.getLogger(JvmG1GCMetricsLogger.class);
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java
index 8a8da0bb1ac..d3288154bd1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java
@@ -36,17 +36,16 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import lombok.CustomLog;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.util.DirectMemoryUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * This class is responsible for providing JVM metrics.
  */
+@CustomLog
 public class JvmMetrics {
 
-    private static final Logger log = 
LoggerFactory.getLogger(JvmMetrics.class);
     private final JvmGCMetricsLogger gcLogger;
 
     private final String componentName;
@@ -66,8 +65,10 @@ public class JvmMetrics {
                 gcLoggerImpl = (JvmGCMetricsLogger) 
Class.forName(gcLoggerImplClassName)
                         .getDeclaredConstructor().newInstance();
             } catch (Exception e) {
-                log.error("Failed to initialize jvmGCMetricsLogger {} due to 
{}", jvmGCMetricsLoggerClassName,
-                        e.getMessage(), e);
+                log.error()
+                        .attr("className", jvmGCMetricsLoggerClassName)
+                        .exception(e)
+                        .log("Failed to initialize jvmGCMetricsLogger");
             }
         }
         return new JvmMetrics(executor, componentName,
@@ -141,9 +142,7 @@ public class JvmMetrics {
         if (usedDirectMemory != -1L) {
             return usedDirectMemory;
         }
-        if (log.isDebugEnabled()) {
-            log.debug("Failed to get netty-direct-memory used count.");
-        }
+        log.debug("Failed to get netty-direct-memory used count.");
 
         List<BufferPoolMXBean> pools = 
ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class);
         for (BufferPoolMXBean pool : pools) {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/tls/TlsHostnameVerifier.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/tls/TlsHostnameVerifier.java
index d5f51be08b8..2bbb9cb34f5 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/tls/TlsHostnameVerifier.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/tls/TlsHostnameVerifier.java
@@ -45,9 +45,9 @@ import javax.net.ssl.SSLException;
 import javax.net.ssl.SSLPeerUnverifiedException;
 import javax.net.ssl.SSLSession;
 import javax.security.auth.x500.X500Principal;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
 
-@Slf4j
+@CustomLog
 public class TlsHostnameVerifier implements HostnameVerifier {
 
     enum HostNameType {
@@ -79,9 +79,7 @@ public class TlsHostnameVerifier implements HostnameVerifier {
             verify(host, x509);
             return true;
         } catch (final SSLException ex) {
-            if (log.isDebugEnabled()) {
-                log.debug(ex.getMessage(), ex);
-            }
+            log.debug().exception(ex).log(ex.getMessage());
             return false;
         }
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicsPatternFactory.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicsPatternFactory.java
index 50434543971..836bd379d8e 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicsPatternFactory.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicsPatternFactory.java
@@ -19,15 +19,15 @@
 package org.apache.pulsar.common.topics;
 
 import java.util.regex.Pattern;
+import lombok.CustomLog;
 import lombok.experimental.UtilityClass;
-import lombok.extern.slf4j.Slf4j;
 
 /**
  * Factory class for creating instances of TopicsPattern based on different 
regex implementations.
  * It supports JDK regex, RE2J regex, and a fallback mechanism for RE2J with 
JDK.
  */
+@CustomLog
 @UtilityClass
-@Slf4j
 public class TopicsPatternFactory {
     /**
      * Creates a TopicsPattern from a JDK Pattern.
@@ -89,10 +89,10 @@ public class TopicsPatternFactory {
                 try {
                     return new RE2JTopicsPattern(inputPattern, 
regexWithoutTopicDomainScheme);
                 } catch (com.google.re2j.PatternSyntaxException e) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Failed to compile regex pattern '{}' with 
RE2J, fallback to JDK",
-                                regexWithoutTopicDomainScheme, e);
-                    }
+                    log.debug()
+                            .attr("pattern", regexWithoutTopicDomainScheme)
+                            .exception(e)
+                            .log("Failed to compile regex pattern with RE2J, 
fallback to JDK");
                     // Fallback to JDK implementation if RE2J fails
                     return new JDKTopicsPattern(inputPattern, 
regexWithoutTopicDomainScheme);
                 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java
index 6cff0a7baec..d8d7be142bc 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java
@@ -24,12 +24,12 @@ import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URLClassLoader;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
 
 /**
  * Helper methods wrt Classloading.
  */
-@Slf4j
+@CustomLog
 public class ClassLoaderUtils {
     /**
      * Load a jar.
@@ -83,7 +83,7 @@ public class ClassLoaderUtils {
             try {
                 ((Closeable) classLoader).close();
             } catch (IOException e) {
-                log.error("Error closing classloader {}", classLoader, e);
+                log.error().attr("classLoader", 
classLoader).exception(e).log("Error closing classloader");
             }
         }
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FileModifiedTimeUpdater.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FileModifiedTimeUpdater.java
index fb9e72435e3..1e8828e1f93 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FileModifiedTimeUpdater.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FileModifiedTimeUpdater.java
@@ -23,14 +23,14 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.attribute.FileTime;
+import lombok.CustomLog;
 import lombok.Getter;
 import lombok.ToString;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Class working with file's modified time.
  */
+@CustomLog
 @ToString
 public class FileModifiedTimeUpdater {
     @Getter
@@ -49,7 +49,7 @@ public class FileModifiedTimeUpdater {
             try {
                 return Files.getLastModifiedTime(p);
             } catch (IOException e) {
-                LOG.error("Unable to fetch lastModified time for file {}: ", 
fileName, e);
+                log.error().attr("file", fileName).exception(e).log("Unable to 
fetch lastModified time for file");
             }
         }
         return null;
@@ -64,5 +64,4 @@ public class FileModifiedTimeUpdater {
         return false;
     }
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(FileModifiedTimeUpdater.class);
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/GracefulExecutorServicesTerminationHandler.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/GracefulExecutorServicesTerminationHandler.java
index ca247fbd85a..319f9aa6fca 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/GracefulExecutorServicesTerminationHandler.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/GracefulExecutorServicesTerminationHandler.java
@@ -26,7 +26,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
 
 /**
  * Waits for termination of {@link ExecutorService}s that have been shutdown.
@@ -35,7 +35,7 @@ import lombok.extern.slf4j.Slf4j;
  *
  * Designed to be used via the API in {@link GracefulExecutorServicesShutdown}
  */
-@Slf4j
+@CustomLog
 class GracefulExecutorServicesTerminationHandler {
     private static final long SHUTDOWN_THREAD_COMPLETION_TIMEOUT_NANOS = 
Duration.ofMillis(100L).toNanos();
     private final List<ExecutorService> executors;
@@ -50,7 +50,7 @@ class GracefulExecutorServicesTerminationHandler {
         this.terminationTimeout = terminationTimeout;
         this.executors = Collections.unmodifiableList(new 
ArrayList<>(executorServices));
         this.future = new CompletableFuture<>();
-        log.info("Starting termination handler for {} executors.", 
executors.size());
+        log.info().attr("executorCount", executors.size()).log("Starting 
termination handler");
         for (ExecutorService executor : executors) {
             if (!executor.isShutdown()) {
                 throw new IllegalStateException(
@@ -68,7 +68,8 @@ class GracefulExecutorServicesTerminationHandler {
                 Thread shutdownWaitingThread = new Thread(this::awaitShutdown, 
getClass().getSimpleName());
                 shutdownWaitingThread.setDaemon(false);
                 shutdownWaitingThread.setUncaughtExceptionHandler((thread, 
exception) -> {
-                  log.error("Uncaught exception in shutdown thread {}", 
thread, exception);
+                  log.error().attr("thread", thread).exception(exception)
+                          .log("Uncaught exception in shutdown thread");
                 });
                 shutdownWaitingThread.start();
                 FutureUtil.whenCancelledOrTimedOut(future, () -> {
@@ -98,7 +99,7 @@ class GracefulExecutorServicesTerminationHandler {
             terminateExecutors();
             markShutdownCompleted();
         } catch (Exception e) {
-            log.error("Error in termination handler", e);
+            log.error().exception(e).log("Error in termination handler");
             future.completeExceptionally(e);
         } finally {
             shutdownThreadCompletedLatch.countDown();
@@ -135,17 +136,17 @@ class GracefulExecutorServicesTerminationHandler {
     private void terminateExecutors() {
         for (ExecutorService executor : executors) {
             if (!executor.isTerminated()) {
-                log.info("Shutting down forcefully executor {}", executor);
+                log.info().attr("executor", executor).log("Shutting down 
forcefully executor");
                 executor.shutdownNow();
             }
         }
         if (!Thread.currentThread().isInterrupted() && 
!awaitTermination(terminationTimeout)) {
             for (ExecutorService executor : executors) {
                 if (!executor.isTerminated()) {
-                    log.warn("Executor {} didn't shutdown after waiting for 
termination.", executor);
+                    log.warn().attr("executor", executor).log("Executor didn't 
shutdown after waiting for termination");
                     for (Runnable runnable : executor.shutdownNow()) {
-                        log.info("Execution in progress for runnable instance 
of {}: {}", runnable.getClass(),
-                                runnable);
+                        log.info().attr("runnableClass", 
runnable.getClass()).attr("runnable", runnable)
+                                .log("Execution in progress for runnable");
                     }
                 }
             }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/KeyManagerProxy.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/KeyManagerProxy.java
index 6b6b0492b45..4e2ac76b7cb 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/KeyManagerProxy.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/KeyManagerProxy.java
@@ -41,13 +41,13 @@ import java.util.stream.Collectors;
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.X509ExtendedKeyManager;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
 
 /**
  * This class wraps {@link X509ExtendedKeyManager} and gives opportunity to 
refresh key-manager with refreshed certs
  * without changing {@link SslContext}.
  */
-@Slf4j
+@CustomLog
 public class KeyManagerProxy extends X509ExtendedKeyManager {
 
     private static final char[] KEYSTORE_PASSWORD = "secret".toCharArray();
@@ -61,13 +61,13 @@ public class KeyManagerProxy extends X509ExtendedKeyManager 
{
         try {
             updateKeyManager();
         } catch (CertificateException e) {
-            log.warn("Failed to load cert {}", certFile, e);
+            log.warn().attr("certFile", certFile).exception(e).log("Failed to 
load cert");
             throw new IllegalArgumentException(e);
         } catch (KeyStoreException e) {
-            log.warn("Failed to load key {}", keyFile, e);
+            log.warn().attr("keyFile", keyFile).exception(e).log("Failed to 
load key");
             throw new IllegalArgumentException(e);
         } catch (NoSuchAlgorithmException | UnrecoverableKeyException e) {
-            log.warn("Failed to update key Manager", e);
+            log.warn().exception(e).log("Failed to update key Manager");
             throw new IllegalArgumentException(e);
         }
         executor.scheduleWithFixedDelay(() -> updateKeyManagerSafely(), 
refreshDurationSec, refreshDurationSec,
@@ -76,12 +76,12 @@ public class KeyManagerProxy extends X509ExtendedKeyManager 
{
 
     private void updateKeyManagerSafely() {
         try {
-            if (log.isDebugEnabled()) {
-                log.debug("refreshing key manager for {} {}", 
certFile.getFileName(), keyFile.getFileName());
-            }
+            log.debug().attr("certFile", 
certFile.getFileName()).attr("keyFile", keyFile.getFileName())
+                    .log("refreshing key manager");
             updateKeyManager();
         } catch (Exception e) {
-            log.warn("Failed to update key Manager for {}, {}", 
certFile.getFileName(), keyFile.getFileName(), e);
+            log.warn().attr("certFile", 
certFile.getFileName()).attr("keyFile", keyFile.getFileName())
+                    .exception(e).log("Failed to update key Manager");
         }
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
index 33bc09725f6..0c7b51781c5 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
@@ -31,7 +31,7 @@ import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
 import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
 import com.fasterxml.jackson.module.paramnames.ParameterNamesModule;
 import java.util.concurrent.atomic.AtomicReference;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
 import org.apache.pulsar.client.admin.internal.data.AuthPoliciesImpl;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.FunctionState;
@@ -113,7 +113,7 @@ import 
org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
 import org.apache.pulsar.policies.data.loadbalancer.LoadReportDeserializer;
 
 @SuppressWarnings("checkstyle:JavadocType")
-@Slf4j
+@CustomLog
 public class ObjectMapperFactory {
     public static class MapperReference {
         private final ObjectMapper objectMapper;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
index 17199664f12..f37c6107555 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
@@ -28,7 +28,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import lombok.Builder;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
 
 /**
  * A Rate Limiter that distributes permits at a configurable rate. Each {@link 
#acquire()} blocks if necessary until a
@@ -51,7 +51,7 @@ import lombok.extern.slf4j.Slf4j;
  * <li><b>Faster: </b>RateLimiter is light-weight and faster than 
Guava-RateLimiter</li>
  * </ul>
  */
-@Slf4j
+@CustomLog
 public class RateLimiter implements AutoCloseable{
     private final ScheduledExecutorService executorService;
     private long rateTime;
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Runnables.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Runnables.java
index dde952a45bd..7733430a679 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Runnables.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Runnables.java
@@ -19,11 +19,10 @@
 package org.apache.pulsar.common.util;
 
 import java.util.concurrent.TimeUnit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.CustomLog;
 
+@CustomLog
 public final class Runnables {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(Runnables.class);
 
     private Runnables() {}
 
@@ -53,7 +52,7 @@ public final class Runnables {
             try {
                 runnable.run();
             } catch (Throwable t) {
-                LOGGER.error("Unexpected throwable caught", t);
+                log.error().exception(t).log("Unexpected throwable caught");
             }
         }
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
index 386b1475636..36632715a8f 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
@@ -66,7 +66,7 @@ import javax.net.ssl.SSLException;
 import javax.net.ssl.SSLParameters;
 import javax.net.ssl.TrustManager;
 import javax.net.ssl.TrustManagerFactory;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.classification.InterfaceAudience;
 import org.apache.pulsar.common.tls.TlsHostnameVerifier;
@@ -74,7 +74,7 @@ import org.apache.pulsar.common.tls.TlsHostnameVerifier;
 /**
  * Helper class for the security domain.
  */
-@Slf4j
+@CustomLog
 public class SecurityUtility {
 
     public static final Provider BC_PROVIDER = getProvider();
@@ -106,9 +106,7 @@ public class SecurityUtility {
             Provider provider = Security.getProvider(BC) != null
                     ? Security.getProvider(BC)
                     : Security.getProvider(BC_FIPS);
-            if (log.isDebugEnabled()) {
-                log.debug("Already instantiated Bouncy Castle provider {}", 
provider.getName());
-            }
+            log.debug().attr("provider", provider.getName()).log("Already 
instantiated Bouncy Castle provider");
             return provider;
         }
 
@@ -116,7 +114,8 @@ public class SecurityUtility {
         try {
             return getBCProviderFromClassPath();
         } catch (Exception e) {
-            log.warn("Not able to get Bouncy Castle provider for both FIPS and 
Non-FIPS from class path:", e);
+            log.warn().exception(e)
+                    .log("Not able to get Bouncy Castle provider for both FIPS 
and Non-FIPS from class path");
             throw new RuntimeException(e);
         }
     }
@@ -131,11 +130,11 @@ public class SecurityUtility {
             if (e instanceof ClassNotFoundException) {
                 log.debug("Conscrypt isn't available in the classpath. Using 
JDK default security provider.");
             } else if (e.getCause() instanceof UnsatisfiedLinkError) {
-                log.debug("Conscrypt isn't available for {} {}. Using JDK 
default security provider.",
-                        System.getProperty("os.name"), 
System.getProperty("os.arch"));
+                log.debug().attr("os", 
System.getProperty("os.name")).attr("arch", System.getProperty("os.arch"))
+                        .log("Conscrypt isn't available. Using JDK default 
security provider");
             } else {
-                log.debug("Conscrypt isn't available. Using JDK default 
security provider."
-                        + " Cause : {}, Reason : {}", e.getCause(), 
e.getMessage());
+                log.debug().attr("cause", e.getCause()).attr("reason", 
e.getMessage())
+                        .log("Conscrypt isn't available. Using JDK default 
security provider");
             }
             return null;
         }
@@ -144,7 +143,8 @@ public class SecurityUtility {
         try {
             provider = (Provider) 
Class.forName(CONSCRYPT_PROVIDER_CLASS).getDeclaredConstructor().newInstance();
         } catch (ReflectiveOperationException e) {
-            log.debug("Unable to get security provider for class {}", 
CONSCRYPT_PROVIDER_CLASS, e);
+            log.debug().attr("class", CONSCRYPT_PROVIDER_CLASS).exception(e)
+                    .log("Unable to get security provider");
             return null;
         }
 
@@ -176,13 +176,12 @@ public class SecurityUtility {
                                     new 
Class<?>[]{Class.forName("org.conscrypt.ConscryptHostnameVerifier")});
             setDefaultHostnameVerifierMethod.invoke(null, 
wrappedHostnameVerifier);
         } catch (Exception e) {
-            log.warn("Unable to set default hostname verifier for Conscrypt", 
e);
+            log.warn().exception(e).log("Unable to set default hostname 
verifier for Conscrypt");
         }
 
         Security.addProvider(provider);
-        if (log.isDebugEnabled()) {
-            log.debug("Added security provider '{}' from class {}", 
provider.getName(), CONSCRYPT_PROVIDER_CLASS);
-        }
+        log.debug().attr("provider", provider.getName()).attr("class", 
CONSCRYPT_PROVIDER_CLASS)
+                .log("Added security provider");
         return provider;
     }
 
@@ -196,17 +195,16 @@ public class SecurityUtility {
             // prefer non FIPS, for backward compatibility concern.
             clazz = Class.forName(BC_NON_FIPS_PROVIDER_CLASS);
         } catch (ClassNotFoundException cnf) {
-            log.warn("Not able to get Bouncy Castle provider: {}, try to get 
FIPS provider {}",
-                    BC_NON_FIPS_PROVIDER_CLASS, BC_FIPS_PROVIDER_CLASS);
+            log.warn().attr("nonFipsClass", 
BC_NON_FIPS_PROVIDER_CLASS).attr("fipsClass", BC_FIPS_PROVIDER_CLASS)
+                    .log("Not able to get Bouncy Castle provider, try to get 
FIPS provider");
             // attempt to use the FIPS provider.
             clazz = Class.forName(BC_FIPS_PROVIDER_CLASS);
         }
 
         Provider provider = (Provider) 
clazz.getDeclaredConstructor().newInstance();
         Security.addProvider(provider);
-        if (log.isDebugEnabled()) {
-            log.debug("Found and Instantiated Bouncy Castle provider in 
classpath {}", provider.getName());
-        }
+        log.debug().attr("provider", provider.getName())
+                .log("Found and Instantiated Bouncy Castle provider in 
classpath");
         return provider;
     }
 
@@ -439,7 +437,8 @@ public class SecurityUtility {
                     }
                 }
             } catch (ReflectiveOperationException e) {
-                log.warn("Unable to set hostname verifier for Conscrypt 
TrustManager implementation", e);
+                log.warn().exception(e)
+                        .log("Unable to set hostname verifier for Conscrypt 
TrustManager implementation");
             }
         }
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ShutdownUtil.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ShutdownUtil.java
index d3a98a6df72..a5a21a9fc0e 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ShutdownUtil.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ShutdownUtil.java
@@ -20,9 +20,9 @@ package org.apache.pulsar.common.util;
 
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
 
-@Slf4j
+@CustomLog
 public class ShutdownUtil {
     private static final Method log4j2ShutdownMethod;
 
@@ -34,7 +34,7 @@ public class ShutdownUtil {
                     .getMethod("shutdown");
         } catch (ClassNotFoundException | NoSuchMethodException e) {
             // ignore when Log4j2 isn't found, log at debug level
-            log.debug("Cannot find 
org.apache.logging.log4j.LogManager.shutdown method", e);
+            log.debug().exception(e).log("Cannot find 
org.apache.logging.log4j.LogManager.shutdown method");
         }
         log4j2ShutdownMethod = shutdownMethod;
     }
@@ -58,8 +58,9 @@ public class ShutdownUtil {
         }
         try {
             if (status != 0 && logging) {
-                log.warn("Triggering immediate shutdown of current process 
with status {}", status,
-                        new Exception("Stacktrace for immediate shutdown"));
+                log.warn().attr("status", status)
+                        .exception(new Exception("Stacktrace for immediate 
shutdown"))
+                        .log("Triggering immediate shutdown of current 
process");
             }
             shutdownLogging();
         } finally {
@@ -74,7 +75,8 @@ public class ShutdownUtil {
                 // use reflection to call 
org.apache.logging.log4j.LogManager.shutdown()
                 log4j2ShutdownMethod.invoke(null);
             } catch (IllegalAccessException | InvocationTargetException e) {
-                log.error("Unable to call 
org.apache.logging.log4j.LogManager.shutdown using reflection.", e);
+                log.error().exception(e)
+                        .log("Unable to call 
org.apache.logging.log4j.LogManager.shutdown using reflection");
             }
         }
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SingleThreadNonConcurrentFixedRateScheduler.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SingleThreadNonConcurrentFixedRateScheduler.java
index 1ddb2101f3a..557b3fd4ad4 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SingleThreadNonConcurrentFixedRateScheduler.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SingleThreadNonConcurrentFixedRateScheduler.java
@@ -31,9 +31,9 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
 
-@Slf4j
+@CustomLog
 public class SingleThreadNonConcurrentFixedRateScheduler extends 
ScheduledThreadPoolExecutor
         implements ScheduledExecutorService {
 
@@ -66,7 +66,7 @@ public class SingleThreadNonConcurrentFixedRateScheduler 
extends ScheduledThread
             try {
                 task.run();
             } catch (Throwable t) {
-                log.warn("Unexpected throwable from task {}: {}", 
task.getClass(), t.getMessage(), t);
+                log.warn().attr("taskClass", 
task.getClass()).exception(t).log("Unexpected throwable from task");
             }
         }
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/TrustManagerProxy.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/TrustManagerProxy.java
index 8ec3dce6fce..d913987a9e0 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/TrustManagerProxy.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/TrustManagerProxy.java
@@ -32,13 +32,13 @@ import java.util.concurrent.TimeUnit;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.TrustManagerFactory;
 import javax.net.ssl.X509ExtendedTrustManager;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
 
 /**
  * This class wraps {@link X509ExtendedTrustManager} and gives opportunity to 
refresh Trust-manager with refreshed certs
  * without changing {@link SslContext}.
  */
-@Slf4j
+@CustomLog
 public class TrustManagerProxy extends X509ExtendedTrustManager {
 
     private volatile X509ExtendedTrustManager trustManager;
@@ -49,10 +49,10 @@ public class TrustManagerProxy extends 
X509ExtendedTrustManager {
         try {
             updateTrustManager();
         } catch (KeyManagementException | IOException | CertificateException 
e) {
-            log.warn("Failed to load cert {}, {}", certFile, e.getMessage());
+            log.warn().attr("certFile", 
certFile).exceptionMessage(e).log("Failed to load cert");
             throw new IllegalArgumentException(e);
         } catch (NoSuchAlgorithmException | KeyStoreException e) {
-            log.warn("Failed to init trust-store", e);
+            log.warn().exception(e).log("Failed to init trust-store");
             throw new IllegalArgumentException(e);
         }
         executor.scheduleWithFixedDelay(() -> updateTrustManagerSafely(), 
refreshDurationSec, refreshDurationSec,
@@ -63,7 +63,7 @@ public class TrustManagerProxy extends 
X509ExtendedTrustManager {
         try {
             updateTrustManager();
         } catch (Exception e) {
-            log.warn("Failed to init trust-store {}", certFile.getFileName(), 
e);
+            log.warn().attr("certFile", 
certFile.getFileName()).exception(e).log("Failed to init trust-store");
         }
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java
index a70857bdf3b..e3e8f583edb 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java
@@ -36,14 +36,14 @@ import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.TrustManager;
 import javax.net.ssl.TrustManagerFactory;
+import lombok.CustomLog;
 import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.util.SecurityUtility;
 
 /**
  * KeyStoreSSLContext that mainly wrap a SSLContext to provide SSL context for 
both webservice and netty.
  */
-@Slf4j
+@CustomLog
 public class KeyStoreSSLContext {
     public static final String DEFAULT_KEYSTORE_TYPE = "JKS";
     public static final String DEFAULT_SSL_PROTOCOL = "TLS";
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/SSLContextValidatorEngine.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/SSLContextValidatorEngine.java
index a01412599bf..a69a18c21d9 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/SSLContextValidatorEngine.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/SSLContextValidatorEngine.java
@@ -24,12 +24,12 @@ import java.util.Arrays;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLEngineResult;
 import javax.net.ssl.SSLException;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
 
 /**
  * SSLContextValidatorEngine to validate 2 SSlContext.
  */
-@Slf4j
+@CustomLog
 public class SSLContextValidatorEngine {
     @FunctionalInterface
     public interface SSLEngineProvider {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
index 8749add6458..c137b319aed 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
@@ -34,10 +34,10 @@ import java.net.InetSocketAddress;
 import java.security.Security;
 import java.util.List;
 import java.util.Optional;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
 import org.apache.commons.lang3.reflect.FieldUtils;
 
-@Slf4j
+@CustomLog
 public class DnsResolverUtil {
 
     private static final String CACHE_POLICY_PROP = "networkaddress.cache.ttl";
@@ -81,7 +81,7 @@ public class DnsResolverUtil {
                     .filter(i -> i >= 0)
                     .orElse(DEFAULT_NEGATIVE_TTL);
         } catch (NumberFormatException e) {
-            log.warn("Cannot get DNS TTL settings", e);
+            log.warn().exception(e).log("Cannot get DNS TTL settings");
         }
         TTL = ttl;
         NEGATIVE_TTL = negativeTtl;
@@ -138,7 +138,8 @@ public class DnsResolverUtil {
                     log.warn("Could not find nameResolver Field in 
InetSocketAddressResolver instance.");
                 }
             } catch (Throwable t) {
-                log.warn("Failed to extract NameResolver from 
InetSocketAddressResolver instance. {}", t.getMessage());
+                log.warn().exceptionMessage(t)
+                        .log("Failed to extract NameResolver from 
InetSocketAddressResolver instance.");
             }
         }
         // fallback to use an adapter if reflection fails
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java
index 7d90492e14c..3dbdd288715 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java
@@ -42,12 +42,12 @@ import 
io.netty.incubator.channel.uring.IOUringServerSocketChannel;
 import io.netty.incubator.channel.uring.IOUringSocketChannel;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadFactory;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
 import org.apache.bookkeeper.common.util.affinity.CpuAffinity;
 
 
 @SuppressWarnings("checkstyle:JavadocType")
-@Slf4j
+@CustomLog
 public class EventLoopUtil {
 
     private static final String ENABLE_IO_URING = "pulsar.enableUring";
@@ -76,8 +76,8 @@ public class EventLoopUtil {
                         try {
                             CpuAffinity.acquireCore();
                         } catch (Throwable t) {
-                            log.warn("Failed to acquire CPU core for thread {} 
{}", Thread.currentThread().getName(),
-                                    t.getMessage(), t);
+                            log.warn().attr("thread", 
Thread.currentThread().getName())
+                                    .exception(t).log("Failed to acquire CPU 
core for thread");
                         }
                     });
                 }
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressionCodecLZ4JNI.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressionCodecLZ4JNI.java
index daea2f167f5..10013169edd 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressionCodecLZ4JNI.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressionCodecLZ4JNI.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.common.compression;
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
 import net.jpountz.lz4.LZ4Compressor;
 import net.jpountz.lz4.LZ4Factory;
 import net.jpountz.lz4.LZ4SafeDecompressor;
@@ -30,7 +30,7 @@ import 
org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 /**
  * LZ4 Compression.
  */
-@Slf4j
+@CustomLog
 public class CompressionCodecLZ4JNI implements CompressionCodec {
 
     static {
@@ -38,7 +38,7 @@ public class CompressionCodecLZ4JNI implements 
CompressionCodec {
             // Force the attempt to load LZ4 JNI
             net.jpountz.util.Native.load();
         } catch (Throwable th) {
-            log.warn("Failed to load native LZ4 implementation: {}", 
th.getMessage());
+            log.warn().attr("message", th.getMessage()).log("Failed to load 
native LZ4 implementation");
         }
     }
 
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressionCodecSnappyJNI.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressionCodecSnappyJNI.java
index ab6d3c08fcb..1f4bb1c0d3a 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressionCodecSnappyJNI.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressionCodecSnappyJNI.java
@@ -22,13 +22,13 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
 import org.xerial.snappy.Snappy;
 
 /**
  * Snappy Compression.
  */
-@Slf4j
+@CustomLog
 public class CompressionCodecSnappyJNI implements CompressionCodec {
 
     @Override
@@ -45,7 +45,7 @@ public class CompressionCodecSnappyJNI implements 
CompressionCodec {
         try {
             compressedLength = Snappy.compress(sourceNio, targetNio);
         } catch (IOException e) {
-            log.error("Failed to compress to Snappy: {}", e.getMessage());
+            log.error().attr("message", e.getMessage()).log("Failed to 
compress to Snappy");
         }
         target.writerIndex(compressedLength);
         return target;
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/nar/NarUnpackerTest.java 
b/pulsar-common/src/test/java/org/apache/pulsar/common/nar/NarUnpackerTest.java
index 799a7024328..99b4198e213 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/nar/NarUnpackerTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/nar/NarUnpackerTest.java
@@ -30,14 +30,14 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.SystemUtils;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-@Slf4j
+@CustomLog
 @Test
 public class NarUnpackerTest {
     File sampleZipFile;
@@ -64,14 +64,14 @@ public class NarUnpackerTest {
             try {
                 sampleZipFile.delete();
             } catch (Exception e) {
-                log.warn("Failed to delete file {}", sampleZipFile, e);
+                log.warn().attr("file", 
sampleZipFile).exception(e).log("Failed to delete file");
             }
         }
         if (extractDirectory != null && extractDirectory.exists()) {
             try {
                 FileUtils.deleteFile(extractDirectory, true);
             } catch (IOException e) {
-                log.warn("Failed to delete directory {}", extractDirectory, e);
+                log.warn().attr("directory", 
extractDirectory).exception(e).log("Failed to delete directory");
             }
         }
     }
@@ -87,7 +87,7 @@ public class NarUnpackerTest {
                 try {
                     NarUnpacker.doUnpackNar(sampleZipFile, extractDirectory, 
extractCounter::incrementAndGet);
                 } catch (Exception e) {
-                    log.error("Unpacking failed", e);
+                    log.error().exception(e).log("Unpacking failed");
                     exceptionCounter.incrementAndGet();
                 } finally {
                     countDownLatch.countDown();
@@ -112,7 +112,7 @@ public class NarUnpackerTest {
                     System.exit(100);
                 }
             } catch (Exception e) {
-                log.error("Unpacking failed", e);
+                log.error().exception(e).log("Unpacking failed");
                 System.exit(99);
             }
         }
@@ -157,14 +157,14 @@ public class NarUnpackerTest {
                             .start();
                     String output = IOUtils.toString(process.getInputStream(), 
StandardCharsets.UTF_8);
                     int retval = process.waitFor();
-                    log.info("Process retval {} output {}", retval, output);
+                    log.info().attr("retval", retval).attr("output", 
output).log("Process completed");
                     if (retval == 101) {
                         extractCounter.incrementAndGet();
                     } else if (retval != 100) {
                         exceptionCounter.incrementAndGet();
                     }
                 } catch (Exception e) {
-                    log.error("Unpacking in a separate process failed", e);
+                    log.error().exception(e).log("Unpacking in a separate 
process failed");
                     exceptionCounter.incrementAndGet();
                 } finally {
                     countDownLatch.countDown();
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/stats/JvmDefaultGCMetricsLoggerTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/stats/JvmDefaultGCMetricsLoggerTest.java
index 895a80cd6cd..966a0835c45 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/stats/JvmDefaultGCMetricsLoggerTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/stats/JvmDefaultGCMetricsLoggerTest.java
@@ -19,17 +19,18 @@
 package org.apache.pulsar.common.stats;
 
 import static org.testng.Assert.assertNotEquals;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
 import org.testng.annotations.Test;
 
-@Slf4j
+@CustomLog
 public class JvmDefaultGCMetricsLoggerTest {
 
     @Test
     public void testInvokeJVMInternals() {
       long safePointCount = JvmDefaultGCMetricsLogger.getSafepointCount();
       long totalSafePointTime = 
JvmDefaultGCMetricsLogger.getTotalSafepointTime();
-      log.info("safePointCount {} totalSafePointTime {}", safePointCount, 
totalSafePointTime);
+      log.info().attr("safePointCount", safePointCount)
+              .attr("totalSafePointTime", totalSafePointTime).log("safepoint 
metrics");
       assertNotEquals(safePointCount, -1);
       assertNotEquals(totalSafePointTime, -1);
     }

Reply via email to