This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 792a76ac762 [improve] PIP-467: Convert pulsar-common module logging
from SLF4J to slog (#25506)
792a76ac762 is described below
commit 792a76ac762109e6ceab5fc25ae2ccb087ec2ab5
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Apr 10 12:32:14 2026 -0700
[improve] PIP-467: Convert pulsar-common module logging from SLF4J to slog
(#25506)
---
distribution/shell/src/assemble/LICENSE.bin.txt | 2 +
pulsar-common/build.gradle.kts | 2 +-
.../client/impl/schema/KeyValueSchemaInfo.java | 4 +-
.../common/allocator/PulsarByteBufAllocator.java | 12 ++-
.../pulsar/common/api/raw/MessageParser.java | 40 ++++++---
.../common/intercept/BrokerEntryMetadataUtils.java | 28 ++++---
.../org/apache/pulsar/common/nar/FileUtils.java | 92 +++++----------------
.../apache/pulsar/common/nar/NarClassLoader.java | 16 ++--
.../org/apache/pulsar/common/nar/NarUnpacker.java | 21 +++--
.../common/policies/data/ClusterDataImpl.java | 4 +-
.../common/policies/data/OffloadPoliciesImpl.java | 6 +-
.../apache/pulsar/common/protocol/Commands.java | 13 +--
.../protocol/OptionalProxyProtocolDecoder.java | 4 +-
.../pulsar/common/protocol/PulsarDecoder.java | 17 ++--
.../pulsar/common/protocol/PulsarHandler.java | 35 ++++----
.../common/sasl/JAASCredentialsContainer.java | 10 +--
.../pulsar/common/sasl/TGTRefreshThread.java | 95 +++++++++++++---------
.../semaphore/AsyncDualMemoryLimiterImpl.java | 4 -
.../common/semaphore/AsyncSemaphoreImpl.java | 4 -
.../common/stats/JvmDefaultGCMetricsLogger.java | 10 +--
.../pulsar/common/stats/JvmG1GCMetricsLogger.java | 7 +-
.../org/apache/pulsar/common/stats/JvmMetrics.java | 15 ++--
.../pulsar/common/tls/TlsHostnameVerifier.java | 8 +-
.../pulsar/common/topics/TopicsPatternFactory.java | 12 +--
.../pulsar/common/util/ClassLoaderUtils.java | 6 +-
.../common/util/FileModifiedTimeUpdater.java | 7 +-
...GracefulExecutorServicesTerminationHandler.java | 19 +++--
.../apache/pulsar/common/util/KeyManagerProxy.java | 18 ++--
.../pulsar/common/util/ObjectMapperFactory.java | 4 +-
.../org/apache/pulsar/common/util/RateLimiter.java | 4 +-
.../org/apache/pulsar/common/util/Runnables.java | 7 +-
.../apache/pulsar/common/util/SecurityUtility.java | 41 +++++-----
.../apache/pulsar/common/util/ShutdownUtil.java | 14 ++--
...ingleThreadNonConcurrentFixedRateScheduler.java | 6 +-
.../pulsar/common/util/TrustManagerProxy.java | 10 +--
.../util/keystoretls/KeyStoreSSLContext.java | 4 +-
.../keystoretls/SSLContextValidatorEngine.java | 4 +-
.../pulsar/common/util/netty/DnsResolverUtil.java | 9 +-
.../pulsar/common/util/netty/EventLoopUtil.java | 8 +-
.../common/compression/CompressionCodecLZ4JNI.java | 6 +-
.../compression/CompressionCodecSnappyJNI.java | 6 +-
.../apache/pulsar/common/nar/NarUnpackerTest.java | 16 ++--
.../stats/JvmDefaultGCMetricsLoggerTest.java | 7 +-
43 files changed, 311 insertions(+), 346 deletions(-)
diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt
b/distribution/shell/src/assemble/LICENSE.bin.txt
index 6aa51417695..6f33219de16 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -391,6 +391,8 @@ The Apache Software License, Version 2.0
- opentelemetry-api-incubator-1.56.0-alpha.jar
- opentelemetry-common-1.56.0.jar
- opentelemetry-context-1.56.0.jar
+ * Slog
+ - slog-0.9.5.jar
* BookKeeper
- bookkeeper-common-allocator-4.17.3.jar
diff --git a/pulsar-common/build.gradle.kts b/pulsar-common/build.gradle.kts
index cdfaaeb16c0..5388569ee13 100644
--- a/pulsar-common/build.gradle.kts
+++ b/pulsar-common/build.gradle.kts
@@ -88,7 +88,7 @@ dependencies {
api(project(":pulsar-client-api"))
api(project(":pulsar-client-admin-api"))
- implementation(libs.slf4j.api)
+ implementation(libs.slog)
implementation(libs.jackson.databind)
implementation(libs.jackson.module.parameter.names)
implementation(libs.jackson.datatype.jsr310)
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
index 4016dfb2a98..aefc9e42324 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java
@@ -23,7 +23,7 @@ import static java.util.Objects.requireNonNull;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.EncodeData;
import org.apache.pulsar.client.api.Schema;
@@ -35,7 +35,7 @@ import org.apache.pulsar.common.schema.SchemaType;
/**
* Util class for processing key/value schema info.
*/
-@Slf4j
+@CustomLog
public final class KeyValueSchemaInfo {
private static final Schema<SchemaInfo> SCHEMA_INFO_WRITER = new
Schema<SchemaInfo>() {
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java
index 4ad0732a62d..087180d125b 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/allocator/PulsarByteBufAllocator.java
@@ -28,8 +28,8 @@ import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.function.Function;
+import lombok.CustomLog;
import lombok.experimental.UtilityClass;
-import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
import org.apache.bookkeeper.common.allocator.LeakDetectionPolicy;
import org.apache.bookkeeper.common.allocator.OutOfMemoryPolicy;
@@ -39,8 +39,8 @@ import org.apache.pulsar.common.util.ShutdownUtil;
/**
* Holder of a ByteBuf allocator.
*/
+@CustomLog
@UtilityClass
-@Slf4j
public class PulsarByteBufAllocator {
public static final String PULSAR_ALLOCATOR_POOLED =
"pulsar.allocator.pooled";
@@ -76,9 +76,7 @@ public class PulsarByteBufAllocator {
System.getProperty(PULSAR_ALLOCATOR_OUT_OF_MEMORY_POLICY,
"FallbackToHeap"));
final LeakDetectionPolicy leakDetectionPolicy =
resolveLeakDetectionPolicyWithHighestLevel(System::getProperty);
- if (log.isDebugEnabled()) {
- log.debug("Is Pooled: {} -- Exit on OOM: {}", isPooled,
isExitOnOutOfMemory);
- }
+ log.debug().attr("isPooled", isPooled).attr("exitOnOOM",
isExitOnOutOfMemory).log("Allocator configuration");
ByteBufAllocatorBuilder builder = ByteBufAllocatorBuilder.create()
.leakDetectionPolicy(leakDetectionPolicy)
@@ -89,12 +87,12 @@ public class PulsarByteBufAllocator {
try {
c.accept(oomException);
} catch (Throwable t) {
- log.warn("Exception during OOM listener: {}",
t.getMessage(), t);
+ log.warn().exception(t).log("Exception during OOM
listener");
}
});
if (isExitOnOutOfMemory) {
- log.info("Exiting JVM process for OOM error: {}",
oomException.getMessage(), oomException);
+ log.info().exception(oomException).log("Exiting JVM
process for OOM error");
ShutdownUtil.triggerImmediateForcefulShutdown();
}
});
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
index 0e9aae4603d..57b21d59a60 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/raw/MessageParser.java
@@ -25,8 +25,8 @@ import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocal;
import java.io.IOException;
+import lombok.CustomLog;
import lombok.experimental.UtilityClass;
-import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodec;
@@ -37,8 +37,8 @@ import org.apache.pulsar.common.protocol.Commands;
/**
* Helper class to work with a raw Pulsar entry payload.
*/
+@CustomLog
@UtilityClass
-@Slf4j
public class MessageParser {
private static final FastThreadLocal<SingleMessageMetadata>
LOCAL_SINGLE_MESSAGE_METADATA = //
@@ -83,8 +83,11 @@ public class MessageParser {
try {
Commands.parseMessageMetadata(payload, msgMetadata);
} catch (Throwable t) {
- log.warn("[{}] Failed to deserialize metadata for message
{}:{} - Ignoring",
- topicName, ledgerId, entryId);
+ log.warn()
+ .attr("topic", topicName)
+ .attr("ledgerId", ledgerId)
+ .attr("entryId", entryId)
+ .log("Failed to deserialize metadata for message -
Ignoring");
return;
}
@@ -128,9 +131,13 @@ public class MessageParser {
int checksum = readChecksum(headersAndPayload);
int computedChecksum = computeChecksum(headersAndPayload);
if (checksum != computedChecksum) {
- log.error(
- "[{}] Checksum mismatch for message at {}:{}. Received
checksum: 0x{}, Computed checksum: 0x{}",
- topic, ledgerId, entryId, Long.toHexString(checksum),
Integer.toHexString(computedChecksum));
+ log.error()
+ .attr("topic", topic)
+ .attr("ledgerId", ledgerId)
+ .attr("entryId", entryId)
+ .attr("receivedChecksum", "0x" +
Long.toHexString(checksum))
+ .attr("computedChecksum", "0x" +
Integer.toHexString(computedChecksum))
+ .log("Checksum mismatch for message");
return false;
}
}
@@ -152,8 +159,12 @@ public class MessageParser {
int payloadSize = payload.readableBytes();
if (payloadSize > maxMessageSize) {
// payload size is itself corrupted since it cannot be bigger than
the MaxMessageSize
- log.error("[{}] Got corrupted payload message size {} at {}:{}",
topic, payloadSize,
- ledgerId, entryId);
+ log.error()
+ .attr("topic", topic)
+ .attr("payloadSize", payloadSize)
+ .attr("ledgerId", ledgerId)
+ .attr("entryId", entryId)
+ .log("Got corrupted payload message size");
return null;
}
@@ -161,8 +172,13 @@ public class MessageParser {
ByteBuf uncompressedPayload = codec.decode(payload,
uncompressedSize);
return uncompressedPayload;
} catch (IOException e) {
- log.error("[{}] Failed to decompress message with {} at {}:{} :
{}", topic,
- msgMetadata.getCompression(), ledgerId, entryId,
e.getMessage(), e);
+ log.error()
+ .attr("topic", topic)
+ .attr("compression", msgMetadata.getCompression())
+ .attr("ledgerId", ledgerId)
+ .attr("entryId", entryId)
+ .exception(e)
+ .log("Failed to decompress message");
return null;
}
}
@@ -187,7 +203,7 @@ public class MessageParser {
ledgerId, entryId, i));
}
} catch (IOException e) {
- log.warn("Unable to obtain messages in batch", e);
+ log.warn().exception(e).log("Unable to obtain messages in batch");
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/BrokerEntryMetadataUtils.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/BrokerEntryMetadataUtils.java
index 00f8e861e44..0c9661d705a 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/BrokerEntryMetadataUtils.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/BrokerEntryMetadataUtils.java
@@ -22,17 +22,15 @@ import java.lang.reflect.InvocationTargetException;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Set;
+import lombok.CustomLog;
import org.apache.pulsar.common.util.ClassLoaderUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* A tool class for loading BrokerEntryMetadataInterceptor classes.
*/
+@CustomLog
public class BrokerEntryMetadataUtils<T> {
- private static final Logger log =
LoggerFactory.getLogger(BrokerEntryMetadataUtils.class);
-
public static Set<BrokerEntryMetadataInterceptor>
loadBrokerEntryMetadataInterceptors(
Set<String> interceptorNames, ClassLoader classLoader) {
Set<BrokerEntryMetadataInterceptor> interceptors = new HashSet<>();
@@ -46,12 +44,17 @@ public class BrokerEntryMetadataUtils<T> {
interceptors.add(clz.getDeclaredConstructor().newInstance());
} catch (InstantiationException | IllegalAccessException
| InvocationTargetException |
NoSuchMethodException e) {
- log.error("Create new BrokerEntryMetadataInterceptor
instance for {} failed.",
- interceptorName, e);
+ log.error()
+ .attr("interceptorName", interceptorName)
+ .exception(e)
+ .log("Create new
BrokerEntryMetadataInterceptor instance failed.");
throw new RuntimeException(e);
}
} catch (ClassNotFoundException e) {
- log.error("Load BrokerEntryMetadataInterceptor class for
{} failed.", interceptorName, e);
+ log.error()
+ .attr("interceptorName", interceptorName)
+ .exception(e)
+ .log("Load BrokerEntryMetadataInterceptor class
failed.");
throw new RuntimeException(e);
}
}
@@ -71,12 +74,17 @@ public class BrokerEntryMetadataUtils<T> {
interceptors.add(clz.getDeclaredConstructor().newInstance());
} catch (InstantiationException | IllegalAccessException
| InvocationTargetException | NoSuchMethodException e)
{
- log.error("Create new instance for {} failed.
Exception is {}",
- interceptorName, e);
+ log.error()
+ .attr("interceptorName", interceptorName)
+ .exception(e)
+ .log("Create new instance failed.");
throw new RuntimeException(e);
}
} catch (ClassNotFoundException e) {
- log.error("Load class for {} failed. Exception is {}",
interceptorName, e);
+ log.error()
+ .attr("interceptorName", interceptorName)
+ .exception(e)
+ .log("Load class failed.");
throw new RuntimeException(e);
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java
index 38665ce4e1b..f231f95ced7 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/FileUtils.java
@@ -31,15 +31,14 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
-import lombok.extern.slf4j.Slf4j;
-import org.slf4j.Logger;
+import lombok.CustomLog;
/**
* A utility class containing a few useful static methods to do typical IO
* operations.
*
*/
-@Slf4j
+@CustomLog
public class FileUtils {
public static final long MILLIS_BETWEEN_ATTEMPTS = 50L;
@@ -72,29 +71,7 @@ public class FileUtils {
}
}
- /**
- * Deletes the given file. If the given file exists but could not be
deleted
- * this will be printed as a warning to the given logger
- *
- * @param file to delete
- * @param logger to notify
- * @return true if deleted
- */
- public static boolean deleteFile(final File file, final Logger logger) {
- return FileUtils.deleteFile(file, logger, 1);
- }
-
- /**
- * Deletes the given file. If the given file exists but could not be
deleted
- * this will be printed as a warning to the given logger
- *
- * @param file to delete
- * @param logger to notify
- * @param attempts indicates how many times an attempt to delete should be
- * made
- * @return true if given file no longer exists
- */
- public static boolean deleteFile(final File file, final Logger logger,
final int attempts) {
+ private static boolean deleteFile(final File file, final int attempts) {
if (file == null) {
return false;
}
@@ -108,51 +85,17 @@ public class FileUtils {
FileUtils.sleepQuietly(MILLIS_BETWEEN_ATTEMPTS);
}
}
- if (!isGone && logger != null) {
- logger.warn("File appears to exist but unable to delete
file: " + file.getAbsolutePath());
+ if (!isGone) {
+ log.warn().attr("file", file.getAbsolutePath())
+ .log("File appears to exist but unable to delete
file");
}
}
} catch (final Throwable t) {
- if (logger != null) {
- logger.warn("Unable to delete file: '" +
file.getAbsolutePath() + "' due to " + t);
- }
+ log.warn().attr("file",
file.getAbsolutePath()).exception(t).log("Unable to delete file");
}
return isGone;
}
- /**
- * Deletes all files (not directories..) in the given directory (non
- * recursive) that match the given filename filter. If any file cannot be
- * deleted then this is printed at warn to the given logger.
- *
- * @param directory to delete contents of
- * @param filter if null then no filter is used
- * @param logger to notify
- * @throws IOException if abstract pathname does not denote a directory, or
- * if an I/O error occurs
- */
- public static void deleteFilesInDirectory(final File directory, final
FilenameFilter filter,
- final Logger logger) throws
IOException {
- FileUtils.deleteFilesInDirectory(directory, filter, logger, false);
- }
-
- /**
- * Deletes all files (not directories) in the given directory (recursive)
- * that match the given filename filter. If any file cannot be deleted then
- * this is printed at warn to the given logger.
- *
- * @param directory to delete contents of
- * @param filter if null then no filter is used
- * @param logger to notify
- * @param recurse true if should recurse
- * @throws IOException if abstract pathname does not denote a directory, or
- * if an I/O error occurs
- */
- public static void deleteFilesInDirectory(final File directory, final
FilenameFilter filter, final Logger logger,
- final boolean recurse) throws
IOException {
- FileUtils.deleteFilesInDirectory(directory, filter, logger, recurse,
false);
- }
-
/**
* Deletes all files (not directories) in the given directory (recursive)
* that match the given filename filter. If any file cannot be deleted then
@@ -160,7 +103,6 @@ public class FileUtils {
*
* @param directory to delete contents of
* @param filter if null then no filter is used
- * @param logger to notify
* @param recurse will look for contents of sub directories.
* @param deleteEmptyDirectories default is false; if true will delete
* directories found that are empty
@@ -168,7 +110,7 @@ public class FileUtils {
* if an I/O error occurs
*/
public static void deleteFilesInDirectory(
- final File directory, final FilenameFilter filter, final Logger logger,
+ final File directory, final FilenameFilter filter,
final boolean recurse, final boolean deleteEmptyDirectories) throws
IOException {
// ensure the specified directory is actually a directory and that it
exists
if (null != directory && directory.isDirectory()) {
@@ -180,13 +122,13 @@ public class FileUtils {
for (File ingestFile : ingestFiles) {
boolean process = (filter == null) ? true :
filter.accept(directory, ingestFile.getName());
if (ingestFile.isFile() && process) {
- FileUtils.deleteFile(ingestFile, logger, 3);
+ FileUtils.deleteFile(ingestFile, 3);
}
if (ingestFile.isDirectory() && recurse) {
- FileUtils.deleteFilesInDirectory(ingestFile, filter,
logger, recurse, deleteEmptyDirectories);
+ FileUtils.deleteFilesInDirectory(ingestFile, filter,
recurse, deleteEmptyDirectories);
String[] ingestFileList = ingestFile.list();
if (deleteEmptyDirectories && ingestFileList != null &&
ingestFileList.length == 0) {
- FileUtils.deleteFile(ingestFile, logger, 3);
+ FileUtils.deleteFile(ingestFile, 3);
}
}
}
@@ -212,7 +154,7 @@ public class FileUtils {
FileUtils.deleteFiles(Arrays.asList(list), recurse);
}
//now delete the file itself regardless of whether it is plain file or
a directory
- if (!FileUtils.deleteFile(file, null, 5)) {
+ if (!FileUtils.deleteFile(file, 5)) {
throw new IOException("Unable to delete " +
file.getAbsolutePath());
}
}
@@ -229,14 +171,18 @@ public class FileUtils {
try (ZipFile zipFile = new ZipFile(jarFile);) {
ZipEntry entry = zipFile.getEntry("META-INF/bundled-dependencies");
if (entry == null || !entry.isDirectory()) {
- log.info("Jar file {} does not contain
META-INF/bundled-dependencies, it is not a NAR file", jarFile);
+ log.info().attr("jarFile", jarFile)
+ .log("Jar file does not contain
META-INF/bundled-dependencies,"
+ + " it is not a NAR file");
return false;
} else {
- log.info("Jar file {} contains META-INF/bundled-dependencies,
it may be a NAR file", jarFile);
+ log.info().attr("jarFile", jarFile)
+ .log("Jar file contains META-INF/bundled-dependencies,"
+ + " it may be a NAR file");
return true;
}
} catch (IOException err) {
- log.info("Cannot safely detect if {} is a NAR archive", jarFile,
err);
+ log.info().attr("jarFile", jarFile).exception(err).log("Cannot
safely detect if file is a NAR archive");
return true;
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
index 48c2db725d8..b6bd34e1d9e 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarClassLoader.java
@@ -40,7 +40,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
/**
* <p>
@@ -122,7 +122,7 @@ import lombok.extern.slf4j.Slf4j;
* NAR.
* </p>
*/
-@Slf4j
+@CustomLog
public class NarClassLoader extends URLClassLoader {
private static final FileFilter JAR_FILTER = pathname -> {
@@ -180,9 +180,7 @@ public class NarClassLoader extends URLClassLoader {
}
}
- if (log.isDebugEnabled()) {
- log.debug("Created class loader with paths: {}",
Arrays.toString(getURLs()));
- }
+ log.debug().attr("paths", () ->
Arrays.toString(getURLs())).log("Created class loader with paths");
}
public File getWorkingDirectory() {
@@ -236,7 +234,7 @@ public class NarClassLoader extends URLClassLoader {
try {
addURL(f.toURI().toURL());
} catch (IOException e) {
- log.error("Failed to add entry to classpath: {}", f, e);
+ log.error().attr("entry", f).exception(e).log("Failed to add
entry to classpath");
}
});
}
@@ -246,7 +244,7 @@ public class NarClassLoader extends URLClassLoader {
classPathEntries.add(root);
File dependencies = new File(root, "META-INF/bundled-dependencies");
if (!dependencies.isDirectory()) {
- log.warn("{} does not contain META-INF/bundled-dependencies!",
root);
+ log.warn().attr("root", root).log("does not contain
META-INF/bundled-dependencies!");
}
classPathEntries.add(dependencies);
if (dependencies.isDirectory()) {
@@ -263,7 +261,7 @@ public class NarClassLoader extends URLClassLoader {
protected String findLibrary(final String libname) {
File dependencies = new File(narWorkingDirectory,
"META-INF/bundled-dependencies");
if (!dependencies.isDirectory()) {
- log.warn("{} does not contain META-INF/bundled-dependencies!",
narWorkingDirectory);
+ log.warn().attr("directory", narWorkingDirectory).log("does not
contain META-INF/bundled-dependencies!");
}
final File nativeDir = new File(dependencies, "native");
@@ -290,7 +288,7 @@ public class NarClassLoader extends URLClassLoader {
@Override
protected Class<?> loadClass(String name, boolean resolve) throws
ClassNotFoundException {
if (closed.get()) {
- log.warn("Loading class {} from a closed classloader ({})", name,
this);
+ log.warn().attr("class", name).attr("classloader",
this).log("Loading class from a closed classloader");
}
return super.loadClass(name, resolve);
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
index b30aa37ff16..4da17e31834 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/nar/NarUnpacker.java
@@ -43,12 +43,12 @@ import java.util.Enumeration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
/**
* Helper class to unpack NARs.
*/
-@Slf4j
+@CustomLog
public class NarUnpacker {
private static final ConcurrentHashMap<String, Object>
CURRENT_JVM_FILE_LOCKS = new ConcurrentHashMap<>();
@@ -73,7 +73,7 @@ public class NarUnpacker {
File parentDirectory = new File(baseWorkingDirectory, nar.getName() +
"-unpacked");
if (!parentDirectory.exists()) {
if (parentDirectory.mkdirs()) {
- log.info("Created directory {}", parentDirectory);
+ log.info().attr("directory", parentDirectory).log("Created
directory");
} else if (!parentDirectory.exists()) {
throw new IOException("Cannot create " + parentDirectory);
}
@@ -98,18 +98,23 @@ public class NarUnpacker {
throw new IOException("Cannot create " +
narExtractionTempDirectory);
}
try {
- log.info("Extracting {} to {}", nar,
narExtractionTempDirectory);
+ log.info().attr("nar", nar).attr("destination",
narExtractionTempDirectory).log("Extracting");
if (extractCallback != null) {
extractCallback.run();
}
unpack(nar, narExtractionTempDirectory);
} catch (IOException e) {
- log.error("There was a problem extracting the nar
file. Deleting {} to clean up state.",
- narExtractionTempDirectory, e);
+ log.error()
+ .attr("directory", narExtractionTempDirectory)
+ .exception(e)
+ .log("There was a problem extracting the nar
file. Deleting to clean up state.");
try {
FileUtils.deleteFile(narExtractionTempDirectory,
true);
} catch (IOException e2) {
- log.error("Failed to delete temporary directory
{}", narExtractionTempDirectory, e2);
+ log.error()
+ .attr("directory",
narExtractionTempDirectory)
+ .exception(e2)
+ .log("Failed to delete temporary
directory");
}
throw e;
}
@@ -138,7 +143,7 @@ public class NarUnpacker {
String name = zipEntry.getName();
Path targetFilePath =
workingDirectoryPath.resolve(name).normalize();
if (!targetFilePath.startsWith(workingDirectoryPath)) {
- log.error("Invalid zip file with entry '{}'", name);
+ log.error().attr("entry", name).log("Invalid zip file with
entry");
throw new IOException("Invalid zip file. Aborting
unpacking.");
}
File f = targetFilePath.toFile();
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterDataImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterDataImpl.java
index b887fe0a586..af82da4b790 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterDataImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/ClusterDataImpl.java
@@ -23,9 +23,9 @@ import io.swagger.annotations.ApiModelProperty;
import java.util.LinkedHashSet;
import java.util.Objects;
import lombok.AllArgsConstructor;
+import lombok.CustomLog;
import lombok.Data;
import lombok.NoArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.common.util.DefaultPulsarSslFactory;
@@ -41,7 +41,7 @@ import org.apache.pulsar.common.util.URIPreconditions;
@Data
@AllArgsConstructor
@NoArgsConstructor
-@Slf4j
+@CustomLog
public final class ClusterDataImpl implements ClusterData, Cloneable {
@ApiModelProperty(
name = "serviceUrl",
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
index 4e365b1dafc..f1c687bb97d 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java
@@ -34,15 +34,15 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
+import lombok.CustomLog;
import lombok.Data;
import lombok.NoArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
/**
* Definition of the offload policies.
*/
-@Slf4j
+@CustomLog
@Data
@NoArgsConstructor
public class OffloadPoliciesImpl implements Serializable, OffloadPolicies {
@@ -452,7 +452,7 @@ public class OffloadPoliciesImpl implements Serializable,
OffloadPolicies {
return offloadPolicies;
}
} catch (Exception e) {
- log.error("Failed to merge configuration.", e);
+ log.error().exception(e).log("Failed to merge configuration.");
return null;
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 275b6160061..8c6289b944d 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -40,8 +40,8 @@ import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.ToLongFunction;
+import lombok.CustomLog;
import lombok.experimental.UtilityClass;
-import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.pulsar.PulsarVersion;
@@ -115,7 +115,7 @@ import
org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentBitSet;
@UtilityClass
-@Slf4j
+@CustomLog
@SuppressWarnings("checkstyle:JavadocType")
public class Commands {
@@ -2064,7 +2064,8 @@ public class Commands {
MessageMetadata metadata =
parseMessageMetadata(metadataAndPayload);
return metadata;
} catch (Throwable t) {
- log.error("[{}] [{}] Failed to parse message metadata",
subscription, consumerId, t);
+ log.error().attr("subscription", subscription).attr("consumerId",
consumerId).exception(t)
+ .log("Failed to parse message metadata");
return null;
} finally {
metadataAndPayload.readerIndex(readerIdx);
@@ -2094,7 +2095,8 @@ public class Commands {
try {
peekMessageMetadata(metadataAndPayload, metadata);
} catch (Throwable t) {
- log.error("[{}] [{}] Failed to parse message metadata",
subscription, consumerId, t);
+ log.error().attr("subscription", subscription).attr("consumerId",
consumerId).exception(t)
+ .log("Failed to parse message metadata");
return null;
}
return metadata;
@@ -2108,7 +2110,8 @@ public class Commands {
MessageMetadata metadata =
parseMessageMetadata(metadataAndPayload);
return resolveStickyKey(metadata);
} catch (Throwable t) {
- log.error("[{}] [{}] Failed to peek sticky key from the message
metadata", topic, subscription, t);
+ log.error().attr("topic", topic).attr("subscription",
subscription).exception(t)
+ .log("Failed to peek sticky key from the message
metadata");
return NONE_KEY;
} finally {
metadataAndPayload.readerIndex(readerIdx);
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/OptionalProxyProtocolDecoder.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/OptionalProxyProtocolDecoder.java
index b4e15f8cd1d..4bc7f256fda 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/OptionalProxyProtocolDecoder.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/OptionalProxyProtocolDecoder.java
@@ -26,13 +26,13 @@ import io.netty.handler.codec.ProtocolDetectionResult;
import io.netty.handler.codec.ProtocolDetectionState;
import io.netty.handler.codec.haproxy.HAProxyMessageDecoder;
import io.netty.handler.codec.haproxy.HAProxyProtocolVersion;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
/**
* Decoder that added whether a new connection is prefixed with the
ProxyProtocol.
* More about the ProxyProtocol see:
http://www.haproxy.org/download/1.8/doc/proxy-protocol.txt.
*/
-@Slf4j
+@CustomLog
public class OptionalProxyProtocolDecoder extends ChannelInboundHandlerAdapter
{
public static final String NAME = "optional-proxy-protocol-decoder";
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index 9017805c75b..595d75e8801 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -26,6 +26,7 @@ import io.netty.channel.ChannelOutboundInvoker;
import io.netty.handler.codec.haproxy.HAProxyMessage;
import io.netty.handler.ssl.SslCloseCompletionEvent;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
+import lombok.CustomLog;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandAckResponse;
@@ -88,8 +89,6 @@ import
org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.util.netty.NettyChannelUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Basic implementation of the channel handler to process inbound Pulsar data.
@@ -100,6 +99,7 @@ import org.slf4j.LoggerFactory;
* after the method returns.</b> If you need to pass an instance of the
command instance to another thread or retain a
* reference to it after the handle* method completes, you must make a deep
copy of the command instance.
*/
+@CustomLog
public abstract class PulsarDecoder extends ChannelInboundHandlerAdapter {
// From the proxy protocol. If present, it means the client is connected
via a reverse proxy.
@@ -123,9 +123,7 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
int cmdSize = (int) buffer.readUnsignedInt();
cmd.parseFrom(buffer, cmdSize);
- if (log.isDebugEnabled()) {
- log.debug("[{}] Received cmd {}", ctx.channel(),
cmd.getType());
- }
+ log.debug().attr("channel", ctx.channel()).attr("cmdType",
cmd.getType()).log("Received cmd");
messageReceived(cmd);
switch (cmd.getType()) {
@@ -746,8 +744,6 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
throw new UnsupportedOperationException();
}
- private static final Logger log =
LoggerFactory.getLogger(PulsarDecoder.class);
-
private void writeAndFlush(ChannelOutboundInvoker ctx, ByteBuf cmd) {
NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, cmd);
}
@@ -757,7 +753,8 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
// log handshake failures
SslHandshakeCompletionEvent sslHandshakeCompletionEvent =
(SslHandshakeCompletionEvent) evt;
if (!sslHandshakeCompletionEvent.isSuccess()) {
- log.warn("[{}] TLS handshake failed. {}", ctx.channel(),
sslHandshakeCompletionEvent);
+ log.warn().attr("channel", ctx.channel()).attr("event",
sslHandshakeCompletionEvent)
+ .log("TLS handshake failed");
}
} else if (evt instanceof SslCloseCompletionEvent) {
// handle TLS close_notify event and immediately close the channel
@@ -765,9 +762,7 @@ public abstract class PulsarDecoder extends
ChannelInboundHandlerAdapter {
// See https://datatracker.ietf.org/doc/html/rfc8446#section-6.1
for more details
SslCloseCompletionEvent sslCloseCompletionEvent =
(SslCloseCompletionEvent) evt;
if (sslCloseCompletionEvent.isSuccess() &&
ctx.channel().isActive()) {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Received a TLS close_notify, closing the
channel.", ctx.channel());
- }
+ log.debug().attr("channel", ctx.channel()).log("Received a TLS
close_notify, closing the channel");
ctx.close();
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
index e8010ea1a51..eecff3686a7 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarHandler.java
@@ -25,13 +25,12 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.ScheduledFuture;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
+import lombok.CustomLog;
import lombok.Setter;
import org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.common.api.proto.CommandPing;
import org.apache.pulsar.common.api.proto.CommandPong;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Implementation of the channel handler to process inbound Pulsar data.
@@ -39,6 +38,7 @@ import org.slf4j.LoggerFactory;
* Please see {@link org.apache.pulsar.common.protocol.PulsarDecoder} javadoc
for important details about handle* method
* parameter instance lifecycle.
*/
+@CustomLog
public abstract class PulsarHandler extends PulsarDecoder {
@VisibleForTesting
@Setter
@@ -71,9 +71,8 @@ public abstract class PulsarHandler extends PulsarDecoder {
this.remoteAddress = ctx.channel().remoteAddress();
this.ctx = ctx;
- if (log.isDebugEnabled()) {
- log.debug("[{}] Scheduling keep-alive task every {} s",
this.toString(), keepAliveIntervalSeconds);
- }
+ log.debug().attr("handler", this).attr("intervalSeconds",
keepAliveIntervalSeconds)
+ .log("Scheduling keep-alive task");
if (keepAliveIntervalSeconds > 0) {
this.keepAliveTask = ctx.executor()
.scheduleAtFixedRate(catchingAndLoggingThrowables(this::handleKeepAliveTimeout),
@@ -89,14 +88,12 @@ public abstract class PulsarHandler extends PulsarDecoder {
@Override
protected final void handlePing(CommandPing ping) {
// Immediately reply success to ping requests
- if (log.isDebugEnabled()) {
- log.debug("[{}] Replying back to ping message", this.toString());
- }
+ log.debug().attr("handler", this).log("Replying back to ping message");
ctx.writeAndFlush(Commands.newPong())
.addListener(future -> {
if (!future.isSuccess()) {
- log.warn("[{}] Forcing connection to close since
cannot send a pong message.",
- toString(), future.cause());
+ log.warn().attr("handler",
toString()).exception(future.cause())
+ .log("Forcing connection to close since cannot
send a pong message");
ctx.close();
}
});
@@ -112,25 +109,22 @@ public abstract class PulsarHandler extends PulsarDecoder
{
}
if (!isHandshakeCompleted()) {
- log.warn("[{}] Pulsar Handshake was not completed within timeout,
closing connection", this.toString());
+ log.warn().attr("handler", this)
+ .log("Pulsar Handshake was not completed within timeout,
closing connection");
ctx.close();
} else if (waitingForPingResponse &&
ctx.channel().config().isAutoRead()) {
// We were waiting for a response and another keep-alive just
completed.
// If auto-read was disabled, it means we stopped reading from the
connection, so we might receive the Ping
// response later and thus not enforce the strict timeout here.
- log.warn("[{}] Forcing connection to close after keep-alive
timeout", this.toString());
+ log.warn().attr("handler", this).log("Forcing connection to close
after keep-alive timeout");
ctx.close();
} else if (getRemoteEndpointProtocolVersion() >=
ProtocolVersion.v1.getValue()) {
// Send keep alive probe to peer only if it supports the ping/pong
commands, added in v1
- if (log.isDebugEnabled()) {
- log.debug("[{}] Sending ping message", this.toString());
- }
+ log.debug().attr("handler", this).log("Sending ping message");
waitingForPingResponse = true;
sendPing();
} else {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Peer doesn't support keep-alive",
this.toString());
- }
+ log.debug().attr("handler", this).log("Peer doesn't support
keep-alive");
}
}
@@ -138,8 +132,8 @@ public abstract class PulsarHandler extends PulsarDecoder {
return ctx.writeAndFlush(Commands.newPing())
.addListener(future -> {
if (!future.isSuccess()) {
- log.warn("[{}] Forcing connection to close since
cannot send a ping message.",
- this.toString(), future.cause());
+ log.warn().attr("handler",
this).exception(future.cause())
+ .log("Forcing connection to close since cannot
send a ping message");
ctx.close();
}
});
@@ -172,5 +166,4 @@ public abstract class PulsarHandler extends PulsarDecoder {
}
}
- private static final Logger log =
LoggerFactory.getLogger(PulsarHandler.class);
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/JAASCredentialsContainer.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/JAASCredentialsContainer.java
index 025ab415a80..b23fe82eac5 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/JAASCredentialsContainer.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/JAASCredentialsContainer.java
@@ -28,14 +28,14 @@ import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
+import lombok.CustomLog;
import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
/**
* JAAS Credentials Container.
* This is added for support Kerberos authentication.
*/
-@Slf4j
+@CustomLog
@Getter
public class JAASCredentialsContainer implements Closeable {
private Subject subject;
@@ -63,7 +63,7 @@ public class JAASCredentialsContainer implements Closeable {
+ "Please check your java.security.login.auth.config (="
+ System.getProperty("java.security.login.auth.config")
+ ") for section header: " + this.loginContextName;
- log.error("No JAAS Configuration section header found for Client:
{}", errorMessage);
+ log.error().attr("details", errorMessage).log("No JAAS
Configuration section header found for Client");
throw new LoginException(errorMessage);
}
LoginContext loginContext = new LoginContext(loginContextName,
callbackHandler);
@@ -96,9 +96,7 @@ public class JAASCredentialsContainer implements Closeable {
ticketRefreshThread.join(10000);
} catch (InterruptedException exit) {
Thread.currentThread().interrupt();
- if (log.isDebugEnabled()) {
- log.debug("interrupted while waiting for TGT refresh
thread to stop", exit);
- }
+ log.debug().exception(exit).log("interrupted while waiting for
TGT refresh thread to stop");
}
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java
index 6a0a7448f75..ca9329b979b 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/sasl/TGTRefreshThread.java
@@ -25,12 +25,12 @@ import javax.security.auth.kerberos.KerberosPrincipal;
import javax.security.auth.kerberos.KerberosTicket;
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
/**
* TGT Refresh Thread. Copied from Apache ZooKeeper TGT refresh logic.
*/
-@Slf4j
+@CustomLog
public class TGTRefreshThread extends Thread {
private static final Random rng = new Random();
@@ -59,8 +59,9 @@ public class TGTRefreshThread extends Thread {
for (KerberosTicket ticket : tickets) {
KerberosPrincipal server = ticket.getServer();
if (server.getName().equals("krbtgt/" + server.getRealm() + "@" +
server.getRealm())) {
- log.info("Client principal is \"" +
ticket.getClient().getName() + "\".");
- log.info("Server principal is \"" +
ticket.getServer().getName() + "\".");
+ log.info().attr("clientPrincipal",
ticket.getClient().getName())
+ .attr("serverPrincipal", ticket.getServer().getName())
+ .log("Found TGT ticket");
return ticket;
}
}
@@ -82,8 +83,8 @@ public class TGTRefreshThread extends Thread {
private long getRefreshTime(KerberosTicket tgt) {
long start = tgt.getStartTime().getTime();
long expires = tgt.getEndTime().getTime();
- log.info("TGT valid starting at: {}",
tgt.getStartTime().toString());
- log.info("TGT expires: {}",
tgt.getEndTime().toString());
+ log.info().attr("startTime", tgt.getStartTime().toString())
+ .attr("endTime", tgt.getEndTime().toString()).log("TGT
validity period");
long proposedRefresh = start
+ (long) ((expires - start) * (TICKET_RENEW_WINDOW +
(TICKET_RENEW_JITTER * rng.nextDouble())));
if (proposedRefresh > expires) {
@@ -106,20 +107,21 @@ public class TGTRefreshThread extends Thread {
if (tgt == null) {
nextRefresh = now + MIN_TIME_BEFORE_RELOGIN;
nextRefreshDate = new Date(nextRefresh);
- log.warn("No TGT found: will try again at {}",
nextRefreshDate);
+ log.warn().attr("nextRetry", nextRefreshDate).log("No TGT
found: will try again");
} else {
nextRefresh = getRefreshTime(tgt);
long expiry = tgt.getEndTime().getTime();
Date expiryDate = new Date(expiry);
if ((container.isUsingTicketCache()) &&
(tgt.getEndTime().equals(tgt.getRenewTill()))) {
- Object[] logPayload = {expiryDate,
container.getPrincipal(), container.getPrincipal()};
- log.error("The TGT cannot be renewed beyond the next
expiry date: {}."
- + "This process will not be able to authenticate new
SASL connections after that "
- + "time (for example, it will not be authenticate a
new connection with a Broker "
- + "). Ask your system administrator to either
increase the "
- + "'renew until' time by doing : 'modprinc
-maxrenewlife {}' within "
- + "kadmin, or instead, to generate a keytab for {}.
Because the TGT's "
- + "expiry cannot be further extended by refreshing,
exiting refresh thread now.", logPayload);
+ log.error()
+ .attr("expiryDate", expiryDate)
+ .attr("principal", container.getPrincipal())
+ .log("The TGT cannot be renewed beyond the next expiry
date."
+ + " This process will not be able to
authenticate new SASL"
+ + " connections after that time. Ask your
system administrator"
+ + " to either increase the 'renew until' time
by doing"
+ + " 'modprinc -maxrenewlife' within kadmin, or
instead,"
+ + " to generate a keytab. Exiting refresh
thread now.");
return;
}
// determine how long to sleep from looking at ticket's expiry.
@@ -134,20 +136,23 @@ public class TGTRefreshThread extends Thread {
// next scheduled refresh is sooner than (now +
MIN_TIME_BEFORE_LOGIN).
Date until = new Date(nextRefresh);
Date newuntil = new Date(now +
MIN_TIME_BEFORE_RELOGIN);
- Object[] logPayload = {until, newuntil,
MIN_TIME_BEFORE_RELOGIN / 1000};
- log.warn("TGT refresh thread time adjusted from : {}
to : {} since "
- + "the former is sooner than the minimum refresh
interval ("
- + "{} seconds) from now.", logPayload);
+ log.warn()
+ .attr("from", until)
+ .attr("to", newuntil)
+ .attr("minIntervalSeconds",
MIN_TIME_BEFORE_RELOGIN / 1000)
+ .log("TGT refresh thread time adjusted since the
former is sooner"
+ + " than the minimum refresh interval from
now.");
}
nextRefresh = Math.max(nextRefresh, now +
MIN_TIME_BEFORE_RELOGIN);
}
nextRefreshDate = new Date(nextRefresh);
if (nextRefresh > expiry) {
- Object[] logPayload = {nextRefreshDate, expiryDate};
- log.error(
- "next refresh: {} is later than expiry {}." + " This
may indicate a clock skew problem."
- + "Check that this host and the KDC's " + "hosts'
clocks are in sync. Exiting refresh thread.",
- logPayload);
+ log.error()
+ .attr("nextRefresh", nextRefreshDate)
+ .attr("expiry", expiryDate)
+ .log("next refresh is later than expiry. This may
indicate a clock"
+ + " skew problem. Check that this host and the
KDC's hosts'"
+ + " clocks are in sync. Exiting refresh
thread.");
return;
}
}
@@ -155,7 +160,7 @@ public class TGTRefreshThread extends Thread {
log.info("refreshing now because expiry is before next
scheduled refresh time.");
} else if (now < nextRefresh) {
Date until = new Date(nextRefresh);
- log.info("TGT refresh sleeping until: {}", until.toString());
+ log.info().attr("until", until.toString()).log("TGT refresh
sleeping");
try {
Thread.sleep(nextRefresh - now);
} catch (InterruptedException ie) {
@@ -164,10 +169,12 @@ public class TGTRefreshThread extends Thread {
break;
}
} else {
- log.error("nextRefresh:{} is in the past: exiting refresh
thread. Check"
- + " clock sync between this host and KDC - (KDC's clock is
likely ahead of this host)."
- + " Manual intervention will be required for this client
to successfully authenticate."
- + " Exiting refresh thread.", nextRefreshDate);
+ log.error()
+ .attr("nextRefresh", nextRefreshDate)
+ .log("nextRefresh is in the past: exiting refresh thread.
Check clock"
+ + " sync between this host and KDC - (KDC's clock
is likely ahead"
+ + " of this host). Manual intervention will be
required for this"
+ + " client to successfully authenticate.");
break;
}
if (container.isUsingTicketCache()) {
@@ -177,7 +184,7 @@ public class TGTRefreshThread extends Thread {
int retry = 1;
while (retry >= 0) {
try {
- log.info("running ticket cache refresh command: {}
{}", cmd, kinitArgs);
+ log.info().attr("cmd", cmd).attr("args",
kinitArgs).log("running ticket cache refresh command");
ProcessBuilder processBuilder = new ProcessBuilder();
processBuilder.command("bash", "-c", cmd, kinitArgs);
@@ -194,9 +201,12 @@ public class TGTRefreshThread extends Thread {
return;
}
} else {
- Object[] logPayload = {cmd, kinitArgs,
e.toString(), e};
- log.warn("Could not renew TGT due to problem
running shell command: '{}"
- + " {}'; exception was:{}. Exiting refresh
thread.", logPayload);
+ log.warn()
+ .attr("cmd", cmd)
+ .attr("args", kinitArgs)
+ .exception(e)
+ .log("Could not renew TGT due to problem
running shell"
+ + " command. Exiting refresh thread.");
return;
}
}
@@ -216,16 +226,19 @@ public class TGTRefreshThread extends Thread {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- log.error("Interrupted during login retry
after LoginException:", le);
+ log.error().exception(le).log("Interrupted
during login retry after LoginException");
throw le;
}
} else {
- log.error("Could not refresh TGT for principal:
{}.", container.getPrincipal(), le);
+ log.error()
+ .attr("principal", container.getPrincipal())
+ .exception(le)
+ .log("Could not refresh TGT for principal");
}
}
}
} catch (LoginException le) {
- log.error("Failed to refresh TGT: refresh thread exiting
now.", le);
+ log.error().exception(le).log("Failed to refresh TGT: refresh
thread exiting now.");
break;
}
}
@@ -244,7 +257,7 @@ public class TGTRefreshThread extends Thread {
if (!hasSufficientTimeElapsed()) {
return;
}
- log.info("Initiating logout for {}", container.getPrincipal());
+ log.info().attr("principal", container.getPrincipal()).log("Initiating
logout");
synchronized (this) {
//clear up the kerberos state. But the tokens are not cleared! As
per
//the Java kerberos login module code, only the kerberos
credentials
@@ -253,7 +266,7 @@ public class TGTRefreshThread extends Thread {
//login and also update the subject field of this instance to
//have the new credentials (pass it to the LoginContext
constructor)
login = new LoginContext(container.getLoginContextName(),
container.getSubject());
- log.info("Initiating re-login for {}", container.getPrincipal());
+ log.info().attr("principal",
container.getPrincipal()).log("Initiating re-login");
login.login();
container.setLoginContext(login);
}
@@ -262,8 +275,10 @@ public class TGTRefreshThread extends Thread {
private boolean hasSufficientTimeElapsed() {
long now = System.currentTimeMillis();
if (now - getLastLogin() < MIN_TIME_BEFORE_RELOGIN) {
- log.warn("Not attempting to re-login since the last re-login was "
- + "attempted less than {} seconds before.",
MIN_TIME_BEFORE_RELOGIN / 1000);
+ log.warn()
+ .attr("minIntervalSeconds", MIN_TIME_BEFORE_RELOGIN / 1000)
+ .log("Not attempting to re-login since the last re-login was"
+ + " attempted less than the minimum interval.");
return false;
}
// register most recent relogin attempt
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java
index 03873da8f33..498b5d76bc1 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java
@@ -24,15 +24,11 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BooleanSupplier;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Implementation of AsyncDualMemoryLimiter with separate limits for heap and
direct memory.
*/
public class AsyncDualMemoryLimiterImpl implements AsyncDualMemoryLimiter,
AutoCloseable {
- private static final Logger log =
LoggerFactory.getLogger(AsyncDualMemoryLimiterImpl.class);
-
private final ScheduledExecutorService executor;
private final boolean shutdownExecutor;
private final AsyncSemaphoreImpl heapLimiter;
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java
index eb786e474fa..5b4b885044e 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java
@@ -33,15 +33,11 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
import org.apache.pulsar.common.util.Runnables;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Implementation of AsyncSemaphore with timeout and queue size limits.
*/
public class AsyncSemaphoreImpl implements AsyncSemaphore, AutoCloseable {
- private static final Logger log =
LoggerFactory.getLogger(AsyncSemaphoreImpl.class);
-
private final AtomicLong availablePermits;
private final Queue<PendingRequest> queue;
private final long maxPermits;
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmDefaultGCMetricsLogger.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmDefaultGCMetricsLogger.java
index db840c487f3..357e1eecf18 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmDefaultGCMetricsLogger.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmDefaultGCMetricsLogger.java
@@ -24,15 +24,13 @@ import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import lombok.CustomLog;
import lombok.SneakyThrows;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+@CustomLog
@SuppressWarnings({"checkstyle:JavadocType"})
public class JvmDefaultGCMetricsLogger implements JvmGCMetricsLogger {
- private static final Logger log =
LoggerFactory.getLogger(JvmDefaultGCMetricsLogger.class);
-
private volatile long accumulatedFullGcCount = 0;
private volatile long currentFullGcCount = 0;
private volatile long accumulatedFullGcTime = 0;
@@ -58,7 +56,7 @@ public class JvmDefaultGCMetricsLogger implements
JvmGCMetricsLogger {
getTotalSafepointTimeHandle.invoke(runtime);
getSafepointCountHandle.invoke(runtime);
} catch (Throwable e) {
- log.warn("Failed to get Runtime bean", e);
+ log.warn().exception(e).log("Failed to get Runtime bean");
}
}
@@ -129,7 +127,7 @@ public class JvmDefaultGCMetricsLogger implements
JvmGCMetricsLogger {
accumulatedFullGcCount = newSafePointCount;
} catch (Exception e) {
- log.error("Failed to collect GC stats: {}", e.getMessage());
+ log.error().exceptionMessage(e).log("Failed to collect GC stats");
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmG1GCMetricsLogger.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmG1GCMetricsLogger.java
index 203fe59eaf2..4b7c53e9675 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmG1GCMetricsLogger.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmG1GCMetricsLogger.java
@@ -22,12 +22,12 @@ import java.lang.management.ManagementFactory;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.CustomLog;
/**
* Logger for the JVM G1 GC metrics.
*/
+@CustomLog
public class JvmG1GCMetricsLogger implements JvmGCMetricsLogger {
private volatile long accumulatedYoungGcCount = 0;
@@ -83,10 +83,9 @@ public class JvmG1GCMetricsLogger implements
JvmGCMetricsLogger {
accumulatedOldGcCount = newValueOldGcCount;
accumulatedOldGcTime = newValueOldGcTime;
} catch (Exception e) {
- log.error("Failed to collect GC stats: {}", e.getMessage());
+ log.error().exceptionMessage(e).log("Failed to collect GC stats");
}
}
- private static final Logger log =
LoggerFactory.getLogger(JvmG1GCMetricsLogger.class);
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java
index 8a8da0bb1ac..d3288154bd1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/stats/JvmMetrics.java
@@ -36,17 +36,16 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import lombok.CustomLog;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.util.DirectMemoryUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* This class is responsible for providing JVM metrics.
*/
+@CustomLog
public class JvmMetrics {
- private static final Logger log =
LoggerFactory.getLogger(JvmMetrics.class);
private final JvmGCMetricsLogger gcLogger;
private final String componentName;
@@ -66,8 +65,10 @@ public class JvmMetrics {
gcLoggerImpl = (JvmGCMetricsLogger)
Class.forName(gcLoggerImplClassName)
.getDeclaredConstructor().newInstance();
} catch (Exception e) {
- log.error("Failed to initialize jvmGCMetricsLogger {} due to
{}", jvmGCMetricsLoggerClassName,
- e.getMessage(), e);
+ log.error()
+ .attr("className", jvmGCMetricsLoggerClassName)
+ .exception(e)
+ .log("Failed to initialize jvmGCMetricsLogger");
}
}
return new JvmMetrics(executor, componentName,
@@ -141,9 +142,7 @@ public class JvmMetrics {
if (usedDirectMemory != -1L) {
return usedDirectMemory;
}
- if (log.isDebugEnabled()) {
- log.debug("Failed to get netty-direct-memory used count.");
- }
+ log.debug("Failed to get netty-direct-memory used count.");
List<BufferPoolMXBean> pools =
ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class);
for (BufferPoolMXBean pool : pools) {
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/tls/TlsHostnameVerifier.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/tls/TlsHostnameVerifier.java
index d5f51be08b8..2bbb9cb34f5 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/tls/TlsHostnameVerifier.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/tls/TlsHostnameVerifier.java
@@ -45,9 +45,9 @@ import javax.net.ssl.SSLException;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import javax.security.auth.x500.X500Principal;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
-@Slf4j
+@CustomLog
public class TlsHostnameVerifier implements HostnameVerifier {
enum HostNameType {
@@ -79,9 +79,7 @@ public class TlsHostnameVerifier implements HostnameVerifier {
verify(host, x509);
return true;
} catch (final SSLException ex) {
- if (log.isDebugEnabled()) {
- log.debug(ex.getMessage(), ex);
- }
+ log.debug().exception(ex).log(ex.getMessage());
return false;
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicsPatternFactory.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicsPatternFactory.java
index 50434543971..836bd379d8e 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicsPatternFactory.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/topics/TopicsPatternFactory.java
@@ -19,15 +19,15 @@
package org.apache.pulsar.common.topics;
import java.util.regex.Pattern;
+import lombok.CustomLog;
import lombok.experimental.UtilityClass;
-import lombok.extern.slf4j.Slf4j;
/**
* Factory class for creating instances of TopicsPattern based on different
regex implementations.
* It supports JDK regex, RE2J regex, and a fallback mechanism for RE2J with
JDK.
*/
+@CustomLog
@UtilityClass
-@Slf4j
public class TopicsPatternFactory {
/**
* Creates a TopicsPattern from a JDK Pattern.
@@ -89,10 +89,10 @@ public class TopicsPatternFactory {
try {
return new RE2JTopicsPattern(inputPattern,
regexWithoutTopicDomainScheme);
} catch (com.google.re2j.PatternSyntaxException e) {
- if (log.isDebugEnabled()) {
- log.debug("Failed to compile regex pattern '{}' with
RE2J, fallback to JDK",
- regexWithoutTopicDomainScheme, e);
- }
+ log.debug()
+ .attr("pattern", regexWithoutTopicDomainScheme)
+ .exception(e)
+ .log("Failed to compile regex pattern with RE2J,
fallback to JDK");
// Fallback to JDK implementation if RE2J fails
return new JDKTopicsPattern(inputPattern,
regexWithoutTopicDomainScheme);
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java
index 6cff0a7baec..d8d7be142bc 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java
@@ -24,12 +24,12 @@ import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
/**
* Helper methods wrt Classloading.
*/
-@Slf4j
+@CustomLog
public class ClassLoaderUtils {
/**
* Load a jar.
@@ -83,7 +83,7 @@ public class ClassLoaderUtils {
try {
((Closeable) classLoader).close();
} catch (IOException e) {
- log.error("Error closing classloader {}", classLoader, e);
+ log.error().attr("classLoader",
classLoader).exception(e).log("Error closing classloader");
}
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FileModifiedTimeUpdater.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FileModifiedTimeUpdater.java
index fb9e72435e3..1e8828e1f93 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FileModifiedTimeUpdater.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FileModifiedTimeUpdater.java
@@ -23,14 +23,14 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileTime;
+import lombok.CustomLog;
import lombok.Getter;
import lombok.ToString;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Class working with file's modified time.
*/
+@CustomLog
@ToString
public class FileModifiedTimeUpdater {
@Getter
@@ -49,7 +49,7 @@ public class FileModifiedTimeUpdater {
try {
return Files.getLastModifiedTime(p);
} catch (IOException e) {
- LOG.error("Unable to fetch lastModified time for file {}: ",
fileName, e);
+ log.error().attr("file", fileName).exception(e).log("Unable to
fetch lastModified time for file");
}
}
return null;
@@ -64,5 +64,4 @@ public class FileModifiedTimeUpdater {
return false;
}
- private static final Logger LOG =
LoggerFactory.getLogger(FileModifiedTimeUpdater.class);
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/GracefulExecutorServicesTerminationHandler.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/GracefulExecutorServicesTerminationHandler.java
index ca247fbd85a..319f9aa6fca 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/GracefulExecutorServicesTerminationHandler.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/GracefulExecutorServicesTerminationHandler.java
@@ -26,7 +26,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
/**
* Waits for termination of {@link ExecutorService}s that have been shutdown.
@@ -35,7 +35,7 @@ import lombok.extern.slf4j.Slf4j;
*
* Designed to be used via the API in {@link GracefulExecutorServicesShutdown}
*/
-@Slf4j
+@CustomLog
class GracefulExecutorServicesTerminationHandler {
private static final long SHUTDOWN_THREAD_COMPLETION_TIMEOUT_NANOS =
Duration.ofMillis(100L).toNanos();
private final List<ExecutorService> executors;
@@ -50,7 +50,7 @@ class GracefulExecutorServicesTerminationHandler {
this.terminationTimeout = terminationTimeout;
this.executors = Collections.unmodifiableList(new
ArrayList<>(executorServices));
this.future = new CompletableFuture<>();
- log.info("Starting termination handler for {} executors.",
executors.size());
+ log.info().attr("executorCount", executors.size()).log("Starting
termination handler");
for (ExecutorService executor : executors) {
if (!executor.isShutdown()) {
throw new IllegalStateException(
@@ -68,7 +68,8 @@ class GracefulExecutorServicesTerminationHandler {
Thread shutdownWaitingThread = new Thread(this::awaitShutdown,
getClass().getSimpleName());
shutdownWaitingThread.setDaemon(false);
shutdownWaitingThread.setUncaughtExceptionHandler((thread,
exception) -> {
- log.error("Uncaught exception in shutdown thread {}",
thread, exception);
+ log.error().attr("thread", thread).exception(exception)
+ .log("Uncaught exception in shutdown thread");
});
shutdownWaitingThread.start();
FutureUtil.whenCancelledOrTimedOut(future, () -> {
@@ -98,7 +99,7 @@ class GracefulExecutorServicesTerminationHandler {
terminateExecutors();
markShutdownCompleted();
} catch (Exception e) {
- log.error("Error in termination handler", e);
+ log.error().exception(e).log("Error in termination handler");
future.completeExceptionally(e);
} finally {
shutdownThreadCompletedLatch.countDown();
@@ -135,17 +136,17 @@ class GracefulExecutorServicesTerminationHandler {
private void terminateExecutors() {
for (ExecutorService executor : executors) {
if (!executor.isTerminated()) {
- log.info("Shutting down forcefully executor {}", executor);
+ log.info().attr("executor", executor).log("Shutting down
forcefully executor");
executor.shutdownNow();
}
}
if (!Thread.currentThread().isInterrupted() &&
!awaitTermination(terminationTimeout)) {
for (ExecutorService executor : executors) {
if (!executor.isTerminated()) {
- log.warn("Executor {} didn't shutdown after waiting for
termination.", executor);
+ log.warn().attr("executor", executor).log("Executor didn't
shutdown after waiting for termination");
for (Runnable runnable : executor.shutdownNow()) {
- log.info("Execution in progress for runnable instance
of {}: {}", runnable.getClass(),
- runnable);
+ log.info().attr("runnableClass",
runnable.getClass()).attr("runnable", runnable)
+ .log("Execution in progress for runnable");
}
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/KeyManagerProxy.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/KeyManagerProxy.java
index 6b6b0492b45..4e2ac76b7cb 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/KeyManagerProxy.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/KeyManagerProxy.java
@@ -41,13 +41,13 @@ import java.util.stream.Collectors;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.X509ExtendedKeyManager;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
/**
* This class wraps {@link X509ExtendedKeyManager} and gives opportunity to
refresh key-manager with refreshed certs
* without changing {@link SslContext}.
*/
-@Slf4j
+@CustomLog
public class KeyManagerProxy extends X509ExtendedKeyManager {
private static final char[] KEYSTORE_PASSWORD = "secret".toCharArray();
@@ -61,13 +61,13 @@ public class KeyManagerProxy extends X509ExtendedKeyManager
{
try {
updateKeyManager();
} catch (CertificateException e) {
- log.warn("Failed to load cert {}", certFile, e);
+ log.warn().attr("certFile", certFile).exception(e).log("Failed to
load cert");
throw new IllegalArgumentException(e);
} catch (KeyStoreException e) {
- log.warn("Failed to load key {}", keyFile, e);
+ log.warn().attr("keyFile", keyFile).exception(e).log("Failed to
load key");
throw new IllegalArgumentException(e);
} catch (NoSuchAlgorithmException | UnrecoverableKeyException e) {
- log.warn("Failed to update key Manager", e);
+ log.warn().exception(e).log("Failed to update key Manager");
throw new IllegalArgumentException(e);
}
executor.scheduleWithFixedDelay(() -> updateKeyManagerSafely(),
refreshDurationSec, refreshDurationSec,
@@ -76,12 +76,12 @@ public class KeyManagerProxy extends X509ExtendedKeyManager
{
private void updateKeyManagerSafely() {
try {
- if (log.isDebugEnabled()) {
- log.debug("refreshing key manager for {} {}",
certFile.getFileName(), keyFile.getFileName());
- }
+ log.debug().attr("certFile",
certFile.getFileName()).attr("keyFile", keyFile.getFileName())
+ .log("refreshing key manager");
updateKeyManager();
} catch (Exception e) {
- log.warn("Failed to update key Manager for {}, {}",
certFile.getFileName(), keyFile.getFileName(), e);
+ log.warn().attr("certFile",
certFile.getFileName()).attr("keyFile", keyFile.getFileName())
+ .exception(e).log("Failed to update key Manager");
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
index 33bc09725f6..0c7b51781c5 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ObjectMapperFactory.java
@@ -31,7 +31,7 @@ import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.module.paramnames.ParameterNamesModule;
import java.util.concurrent.atomic.AtomicReference;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
import org.apache.pulsar.client.admin.internal.data.AuthPoliciesImpl;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionState;
@@ -113,7 +113,7 @@ import
org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.apache.pulsar.policies.data.loadbalancer.LoadReportDeserializer;
@SuppressWarnings("checkstyle:JavadocType")
-@Slf4j
+@CustomLog
public class ObjectMapperFactory {
public static class MapperReference {
private final ObjectMapper objectMapper;
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
index 17199664f12..f37c6107555 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
@@ -28,7 +28,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import lombok.Builder;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
/**
* A Rate Limiter that distributes permits at a configurable rate. Each {@link
#acquire()} blocks if necessary until a
@@ -51,7 +51,7 @@ import lombok.extern.slf4j.Slf4j;
* <li><b>Faster: </b>RateLimiter is light-weight and faster than
Guava-RateLimiter</li>
* </ul>
*/
-@Slf4j
+@CustomLog
public class RateLimiter implements AutoCloseable{
private final ScheduledExecutorService executorService;
private long rateTime;
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Runnables.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Runnables.java
index dde952a45bd..7733430a679 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Runnables.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Runnables.java
@@ -19,11 +19,10 @@
package org.apache.pulsar.common.util;
import java.util.concurrent.TimeUnit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.CustomLog;
+@CustomLog
public final class Runnables {
- private static final Logger LOGGER =
LoggerFactory.getLogger(Runnables.class);
private Runnables() {}
@@ -53,7 +52,7 @@ public final class Runnables {
try {
runnable.run();
} catch (Throwable t) {
- LOGGER.error("Unexpected throwable caught", t);
+ log.error().exception(t).log("Unexpected throwable caught");
}
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
index 386b1475636..36632715a8f 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SecurityUtility.java
@@ -66,7 +66,7 @@ import javax.net.ssl.SSLException;
import javax.net.ssl.SSLParameters;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.tls.TlsHostnameVerifier;
@@ -74,7 +74,7 @@ import org.apache.pulsar.common.tls.TlsHostnameVerifier;
/**
* Helper class for the security domain.
*/
-@Slf4j
+@CustomLog
public class SecurityUtility {
public static final Provider BC_PROVIDER = getProvider();
@@ -106,9 +106,7 @@ public class SecurityUtility {
Provider provider = Security.getProvider(BC) != null
? Security.getProvider(BC)
: Security.getProvider(BC_FIPS);
- if (log.isDebugEnabled()) {
- log.debug("Already instantiated Bouncy Castle provider {}",
provider.getName());
- }
+ log.debug().attr("provider", provider.getName()).log("Already
instantiated Bouncy Castle provider");
return provider;
}
@@ -116,7 +114,8 @@ public class SecurityUtility {
try {
return getBCProviderFromClassPath();
} catch (Exception e) {
- log.warn("Not able to get Bouncy Castle provider for both FIPS and
Non-FIPS from class path:", e);
+ log.warn().exception(e)
+ .log("Not able to get Bouncy Castle provider for both FIPS
and Non-FIPS from class path");
throw new RuntimeException(e);
}
}
@@ -131,11 +130,11 @@ public class SecurityUtility {
if (e instanceof ClassNotFoundException) {
log.debug("Conscrypt isn't available in the classpath. Using
JDK default security provider.");
} else if (e.getCause() instanceof UnsatisfiedLinkError) {
- log.debug("Conscrypt isn't available for {} {}. Using JDK
default security provider.",
- System.getProperty("os.name"),
System.getProperty("os.arch"));
+ log.debug().attr("os",
System.getProperty("os.name")).attr("arch", System.getProperty("os.arch"))
+ .log("Conscrypt isn't available. Using JDK default
security provider");
} else {
- log.debug("Conscrypt isn't available. Using JDK default
security provider."
- + " Cause : {}, Reason : {}", e.getCause(),
e.getMessage());
+ log.debug().attr("cause", e.getCause()).attr("reason",
e.getMessage())
+ .log("Conscrypt isn't available. Using JDK default
security provider");
}
return null;
}
@@ -144,7 +143,8 @@ public class SecurityUtility {
try {
provider = (Provider)
Class.forName(CONSCRYPT_PROVIDER_CLASS).getDeclaredConstructor().newInstance();
} catch (ReflectiveOperationException e) {
- log.debug("Unable to get security provider for class {}",
CONSCRYPT_PROVIDER_CLASS, e);
+ log.debug().attr("class", CONSCRYPT_PROVIDER_CLASS).exception(e)
+ .log("Unable to get security provider");
return null;
}
@@ -176,13 +176,12 @@ public class SecurityUtility {
new
Class<?>[]{Class.forName("org.conscrypt.ConscryptHostnameVerifier")});
setDefaultHostnameVerifierMethod.invoke(null,
wrappedHostnameVerifier);
} catch (Exception e) {
- log.warn("Unable to set default hostname verifier for Conscrypt",
e);
+ log.warn().exception(e).log("Unable to set default hostname
verifier for Conscrypt");
}
Security.addProvider(provider);
- if (log.isDebugEnabled()) {
- log.debug("Added security provider '{}' from class {}",
provider.getName(), CONSCRYPT_PROVIDER_CLASS);
- }
+ log.debug().attr("provider", provider.getName()).attr("class",
CONSCRYPT_PROVIDER_CLASS)
+ .log("Added security provider");
return provider;
}
@@ -196,17 +195,16 @@ public class SecurityUtility {
// prefer non FIPS, for backward compatibility concern.
clazz = Class.forName(BC_NON_FIPS_PROVIDER_CLASS);
} catch (ClassNotFoundException cnf) {
- log.warn("Not able to get Bouncy Castle provider: {}, try to get
FIPS provider {}",
- BC_NON_FIPS_PROVIDER_CLASS, BC_FIPS_PROVIDER_CLASS);
+ log.warn().attr("nonFipsClass",
BC_NON_FIPS_PROVIDER_CLASS).attr("fipsClass", BC_FIPS_PROVIDER_CLASS)
+ .log("Not able to get Bouncy Castle provider, try to get
FIPS provider");
// attempt to use the FIPS provider.
clazz = Class.forName(BC_FIPS_PROVIDER_CLASS);
}
Provider provider = (Provider)
clazz.getDeclaredConstructor().newInstance();
Security.addProvider(provider);
- if (log.isDebugEnabled()) {
- log.debug("Found and Instantiated Bouncy Castle provider in
classpath {}", provider.getName());
- }
+ log.debug().attr("provider", provider.getName())
+ .log("Found and Instantiated Bouncy Castle provider in
classpath");
return provider;
}
@@ -439,7 +437,8 @@ public class SecurityUtility {
}
}
} catch (ReflectiveOperationException e) {
- log.warn("Unable to set hostname verifier for Conscrypt
TrustManager implementation", e);
+ log.warn().exception(e)
+ .log("Unable to set hostname verifier for Conscrypt
TrustManager implementation");
}
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ShutdownUtil.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ShutdownUtil.java
index d3a98a6df72..a5a21a9fc0e 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ShutdownUtil.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ShutdownUtil.java
@@ -20,9 +20,9 @@ package org.apache.pulsar.common.util;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
-@Slf4j
+@CustomLog
public class ShutdownUtil {
private static final Method log4j2ShutdownMethod;
@@ -34,7 +34,7 @@ public class ShutdownUtil {
.getMethod("shutdown");
} catch (ClassNotFoundException | NoSuchMethodException e) {
// ignore when Log4j2 isn't found, log at debug level
- log.debug("Cannot find
org.apache.logging.log4j.LogManager.shutdown method", e);
+ log.debug().exception(e).log("Cannot find
org.apache.logging.log4j.LogManager.shutdown method");
}
log4j2ShutdownMethod = shutdownMethod;
}
@@ -58,8 +58,9 @@ public class ShutdownUtil {
}
try {
if (status != 0 && logging) {
- log.warn("Triggering immediate shutdown of current process
with status {}", status,
- new Exception("Stacktrace for immediate shutdown"));
+ log.warn().attr("status", status)
+ .exception(new Exception("Stacktrace for immediate
shutdown"))
+ .log("Triggering immediate shutdown of current
process");
}
shutdownLogging();
} finally {
@@ -74,7 +75,8 @@ public class ShutdownUtil {
// use reflection to call
org.apache.logging.log4j.LogManager.shutdown()
log4j2ShutdownMethod.invoke(null);
} catch (IllegalAccessException | InvocationTargetException e) {
- log.error("Unable to call
org.apache.logging.log4j.LogManager.shutdown using reflection.", e);
+ log.error().exception(e)
+ .log("Unable to call
org.apache.logging.log4j.LogManager.shutdown using reflection");
}
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SingleThreadNonConcurrentFixedRateScheduler.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SingleThreadNonConcurrentFixedRateScheduler.java
index 1ddb2101f3a..557b3fd4ad4 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SingleThreadNonConcurrentFixedRateScheduler.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SingleThreadNonConcurrentFixedRateScheduler.java
@@ -31,9 +31,9 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
-@Slf4j
+@CustomLog
public class SingleThreadNonConcurrentFixedRateScheduler extends
ScheduledThreadPoolExecutor
implements ScheduledExecutorService {
@@ -66,7 +66,7 @@ public class SingleThreadNonConcurrentFixedRateScheduler
extends ScheduledThread
try {
task.run();
} catch (Throwable t) {
- log.warn("Unexpected throwable from task {}: {}",
task.getClass(), t.getMessage(), t);
+ log.warn().attr("taskClass",
task.getClass()).exception(t).log("Unexpected throwable from task");
}
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/TrustManagerProxy.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/TrustManagerProxy.java
index 8ec3dce6fce..d913987a9e0 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/TrustManagerProxy.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/TrustManagerProxy.java
@@ -32,13 +32,13 @@ import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509ExtendedTrustManager;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
/**
* This class wraps {@link X509ExtendedTrustManager} and gives opportunity to
refresh Trust-manager with refreshed certs
* without changing {@link SslContext}.
*/
-@Slf4j
+@CustomLog
public class TrustManagerProxy extends X509ExtendedTrustManager {
private volatile X509ExtendedTrustManager trustManager;
@@ -49,10 +49,10 @@ public class TrustManagerProxy extends
X509ExtendedTrustManager {
try {
updateTrustManager();
} catch (KeyManagementException | IOException | CertificateException
e) {
- log.warn("Failed to load cert {}, {}", certFile, e.getMessage());
+ log.warn().attr("certFile",
certFile).exceptionMessage(e).log("Failed to load cert");
throw new IllegalArgumentException(e);
} catch (NoSuchAlgorithmException | KeyStoreException e) {
- log.warn("Failed to init trust-store", e);
+ log.warn().exception(e).log("Failed to init trust-store");
throw new IllegalArgumentException(e);
}
executor.scheduleWithFixedDelay(() -> updateTrustManagerSafely(),
refreshDurationSec, refreshDurationSec,
@@ -63,7 +63,7 @@ public class TrustManagerProxy extends
X509ExtendedTrustManager {
try {
updateTrustManager();
} catch (Exception e) {
- log.warn("Failed to init trust-store {}", certFile.getFileName(),
e);
+ log.warn().attr("certFile",
certFile.getFileName()).exception(e).log("Failed to init trust-store");
}
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java
index a70857bdf3b..e3e8f583edb 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/KeyStoreSSLContext.java
@@ -36,14 +36,14 @@ import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
+import lombok.CustomLog;
import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.util.SecurityUtility;
/**
* KeyStoreSSLContext that mainly wrap a SSLContext to provide SSL context for
both webservice and netty.
*/
-@Slf4j
+@CustomLog
public class KeyStoreSSLContext {
public static final String DEFAULT_KEYSTORE_TYPE = "JKS";
public static final String DEFAULT_SSL_PROTOCOL = "TLS";
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/SSLContextValidatorEngine.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/SSLContextValidatorEngine.java
index a01412599bf..a69a18c21d9 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/SSLContextValidatorEngine.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/keystoretls/SSLContextValidatorEngine.java
@@ -24,12 +24,12 @@ import java.util.Arrays;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLException;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
/**
* SSLContextValidatorEngine to validate 2 SSlContext.
*/
-@Slf4j
+@CustomLog
public class SSLContextValidatorEngine {
@FunctionalInterface
public interface SSLEngineProvider {
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
index 8749add6458..c137b319aed 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
@@ -34,10 +34,10 @@ import java.net.InetSocketAddress;
import java.security.Security;
import java.util.List;
import java.util.Optional;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
import org.apache.commons.lang3.reflect.FieldUtils;
-@Slf4j
+@CustomLog
public class DnsResolverUtil {
private static final String CACHE_POLICY_PROP = "networkaddress.cache.ttl";
@@ -81,7 +81,7 @@ public class DnsResolverUtil {
.filter(i -> i >= 0)
.orElse(DEFAULT_NEGATIVE_TTL);
} catch (NumberFormatException e) {
- log.warn("Cannot get DNS TTL settings", e);
+ log.warn().exception(e).log("Cannot get DNS TTL settings");
}
TTL = ttl;
NEGATIVE_TTL = negativeTtl;
@@ -138,7 +138,8 @@ public class DnsResolverUtil {
log.warn("Could not find nameResolver Field in
InetSocketAddressResolver instance.");
}
} catch (Throwable t) {
- log.warn("Failed to extract NameResolver from
InetSocketAddressResolver instance. {}", t.getMessage());
+ log.warn().exceptionMessage(t)
+ .log("Failed to extract NameResolver from
InetSocketAddressResolver instance.");
}
}
// fallback to use an adapter if reflection fails
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java
index 7d90492e14c..3dbdd288715 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java
@@ -42,12 +42,12 @@ import
io.netty.incubator.channel.uring.IOUringServerSocketChannel;
import io.netty.incubator.channel.uring.IOUringSocketChannel;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadFactory;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
import org.apache.bookkeeper.common.util.affinity.CpuAffinity;
@SuppressWarnings("checkstyle:JavadocType")
-@Slf4j
+@CustomLog
public class EventLoopUtil {
private static final String ENABLE_IO_URING = "pulsar.enableUring";
@@ -76,8 +76,8 @@ public class EventLoopUtil {
try {
CpuAffinity.acquireCore();
} catch (Throwable t) {
- log.warn("Failed to acquire CPU core for thread {}
{}", Thread.currentThread().getName(),
- t.getMessage(), t);
+ log.warn().attr("thread",
Thread.currentThread().getName())
+ .exception(t).log("Failed to acquire CPU
core for thread");
}
});
}
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressionCodecLZ4JNI.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressionCodecLZ4JNI.java
index daea2f167f5..10013169edd 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressionCodecLZ4JNI.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressionCodecLZ4JNI.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.common.compression;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.nio.ByteBuffer;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
import net.jpountz.lz4.LZ4Compressor;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4SafeDecompressor;
@@ -30,7 +30,7 @@ import
org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
/**
* LZ4 Compression.
*/
-@Slf4j
+@CustomLog
public class CompressionCodecLZ4JNI implements CompressionCodec {
static {
@@ -38,7 +38,7 @@ public class CompressionCodecLZ4JNI implements
CompressionCodec {
// Force the attempt to load LZ4 JNI
net.jpountz.util.Native.load();
} catch (Throwable th) {
- log.warn("Failed to load native LZ4 implementation: {}",
th.getMessage());
+ log.warn().attr("message", th.getMessage()).log("Failed to load
native LZ4 implementation");
}
}
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressionCodecSnappyJNI.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressionCodecSnappyJNI.java
index ab6d3c08fcb..1f4bb1c0d3a 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressionCodecSnappyJNI.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressionCodecSnappyJNI.java
@@ -22,13 +22,13 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.io.IOException;
import java.nio.ByteBuffer;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
import org.xerial.snappy.Snappy;
/**
* Snappy Compression.
*/
-@Slf4j
+@CustomLog
public class CompressionCodecSnappyJNI implements CompressionCodec {
@Override
@@ -45,7 +45,7 @@ public class CompressionCodecSnappyJNI implements
CompressionCodec {
try {
compressedLength = Snappy.compress(sourceNio, targetNio);
} catch (IOException e) {
- log.error("Failed to compress to Snappy: {}", e.getMessage());
+ log.error().attr("message", e.getMessage()).log("Failed to
compress to Snappy");
}
target.writerIndex(compressedLength);
return target;
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/nar/NarUnpackerTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/nar/NarUnpackerTest.java
index 799a7024328..99b4198e213 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/nar/NarUnpackerTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/nar/NarUnpackerTest.java
@@ -30,14 +30,14 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.SystemUtils;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-@Slf4j
+@CustomLog
@Test
public class NarUnpackerTest {
File sampleZipFile;
@@ -64,14 +64,14 @@ public class NarUnpackerTest {
try {
sampleZipFile.delete();
} catch (Exception e) {
- log.warn("Failed to delete file {}", sampleZipFile, e);
+ log.warn().attr("file",
sampleZipFile).exception(e).log("Failed to delete file");
}
}
if (extractDirectory != null && extractDirectory.exists()) {
try {
FileUtils.deleteFile(extractDirectory, true);
} catch (IOException e) {
- log.warn("Failed to delete directory {}", extractDirectory, e);
+ log.warn().attr("directory",
extractDirectory).exception(e).log("Failed to delete directory");
}
}
}
@@ -87,7 +87,7 @@ public class NarUnpackerTest {
try {
NarUnpacker.doUnpackNar(sampleZipFile, extractDirectory,
extractCounter::incrementAndGet);
} catch (Exception e) {
- log.error("Unpacking failed", e);
+ log.error().exception(e).log("Unpacking failed");
exceptionCounter.incrementAndGet();
} finally {
countDownLatch.countDown();
@@ -112,7 +112,7 @@ public class NarUnpackerTest {
System.exit(100);
}
} catch (Exception e) {
- log.error("Unpacking failed", e);
+ log.error().exception(e).log("Unpacking failed");
System.exit(99);
}
}
@@ -157,14 +157,14 @@ public class NarUnpackerTest {
.start();
String output = IOUtils.toString(process.getInputStream(),
StandardCharsets.UTF_8);
int retval = process.waitFor();
- log.info("Process retval {} output {}", retval, output);
+ log.info().attr("retval", retval).attr("output",
output).log("Process completed");
if (retval == 101) {
extractCounter.incrementAndGet();
} else if (retval != 100) {
exceptionCounter.incrementAndGet();
}
} catch (Exception e) {
- log.error("Unpacking in a separate process failed", e);
+ log.error().exception(e).log("Unpacking in a separate
process failed");
exceptionCounter.incrementAndGet();
} finally {
countDownLatch.countDown();
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/stats/JvmDefaultGCMetricsLoggerTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/stats/JvmDefaultGCMetricsLoggerTest.java
index 895a80cd6cd..966a0835c45 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/stats/JvmDefaultGCMetricsLoggerTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/stats/JvmDefaultGCMetricsLoggerTest.java
@@ -19,17 +19,18 @@
package org.apache.pulsar.common.stats;
import static org.testng.Assert.assertNotEquals;
-import lombok.extern.slf4j.Slf4j;
+import lombok.CustomLog;
import org.testng.annotations.Test;
-@Slf4j
+@CustomLog
public class JvmDefaultGCMetricsLoggerTest {
@Test
public void testInvokeJVMInternals() {
long safePointCount = JvmDefaultGCMetricsLogger.getSafepointCount();
long totalSafePointTime =
JvmDefaultGCMetricsLogger.getTotalSafepointTime();
- log.info("safePointCount {} totalSafePointTime {}", safePointCount,
totalSafePointTime);
+ log.info().attr("safePointCount", safePointCount)
+ .attr("totalSafePointTime", totalSafePointTime).log("safepoint
metrics");
assertNotEquals(safePointCount, -1);
assertNotEquals(totalSafePointTime, -1);
}