Copilot commented on code in PR #25507:
URL: https://github.com/apache/pulsar/pull/25507#discussion_r3070578923
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtils.java:
##########
@@ -115,7 +116,8 @@ public static Descriptors.Descriptor deserialize(byte[]
schemaDataBytes) {
for (int i = 1; i < paths.length; i++) {
descriptor = descriptor.findNestedTypeByName(paths[i]);
}
- logger.debug("deserialize '{}' to descriptor: '{}'.",
schemaDataBytes, descriptor.getFullName());
+ log.debug().attr("bytes", schemaDataBytes)
+ .attr("descriptor",
descriptor.getFullName()).log("deserialized to descriptor");
} catch (Exception e) {
e.printStackTrace();
throw new SchemaSerializationException(e);
Review Comment:
`printStackTrace()` is used here as well; please replace with structured
logging (`log.error().exception(e)...`) instead of writing directly to stderr.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java:
##########
@@ -66,10 +64,12 @@ public Message<T> onArrival(Consumer<T> consumer,
Message<T> message) {
interceptorMessage = interceptors.get(i).onArrival(consumer,
interceptorMessage);
} catch (Throwable e) {
if (consumer != null) {
- log.warn("Error executing interceptor beforeConsume
callback topic: {} consumerName: {}",
- consumer.getTopic(), consumer.getConsumerName(),
e);
+ log.warn().attr("topic", consumer.getTopic())
+ .attr("consumername", consumer.getConsumerName())
Review Comment:
Attribute name "consumername" is inconsistent with the rest of the client
logs (which use "consumerName"). This breaks standardized attribute querying;
please use the same camelCase key here.
```suggestion
.attr("consumerName", consumer.getConsumerName())
```
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java:
##########
@@ -150,20 +152,25 @@ private void init(ConsumerConfigurationData<?> conf) {
int prefetchQueueSize = consumerImpl.incomingMessages.size();
if ((currentNumMsgsReceived | currentNumBytesReceived |
currentNumReceiveFailed | currentNumAcksSent
| currentNumAcksFailed | prefetchQueueSize) != 0) {
- log.info(
- "[{}] [{}] [{}] Prefetched messages: {} --- "
- + "Consume throughput received: {} msgs/s
--- {} Mbit/s --- "
- + "Ack sent rate: {} ack/s --- " + "Failed
messages: {} --- batch messages: {} ---"
- + "Failed acks: {}",
- consumerImpl.getTopic(),
consumerImpl.getSubscription(), consumerImpl.consumerName,
- prefetchQueueSize,
THROUGHPUT_FORMAT.format(receivedMsgsRate),
- THROUGHPUT_FORMAT.format(receivedBytesRate * 8 /
1024 / 1024),
- THROUGHPUT_FORMAT.format(currentNumAcksSent /
elapsed), currentNumReceiveFailed,
- currentNumBatchReceiveFailed,
currentNumAcksFailed);
+ log.info().attr("topic", consumerImpl.getTopic())
+ .attr("subscription",
consumerImpl.getSubscription())
+ .attr("consumerName", consumerImpl.consumerName)
+ .attr("prefetchedMessages", prefetchQueueSize)
+ .attr("receivedMsgRate",
THROUGHPUT_FORMAT.format(receivedMsgsRate))
+ .attr("receivedBytesRateMbps",
+ THROUGHPUT_FORMAT.format(receivedBytesRate
* 8 / 1024 / 1024))
+ .attr("ackSentRate",
THROUGHPUT_FORMAT.format(currentNumAcksSent / elapsed))
+ .attr("failedMessages", currentNumReceiveFailed)
+ .attr("failedBatchMessages",
currentNumBatchReceiveFailed)
+ .attr("failedAcks", currentNumAcksFailed)
+ .log("Consumer stats");
}
} catch (Exception e) {
- log.error("[{}] [{}] [{}]: {}", consumerImpl.getTopic(),
consumerImpl.subscription
- , consumerImpl.consumerName, e.getMessage());
+ log.error().attr("topic", consumerImpl.getTopic())
+ .attr("subscription", consumerImpl.subscription)
+ .attr("consumerName", consumerImpl.consumerName)
+ .exceptionMessage(e)
+ .log("operation");
Review Comment:
This error log message is too generic ("operation") and only logs the
exception message. Please use a specific message (e.g., updating consumer
stats) and include the exception (`.exception(e)`) so the stack trace is
available when needed.
```suggestion
.exception(e)
.log("Failed to update consumer stats");
```
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtils.java:
##########
@@ -58,7 +58,8 @@ public static byte[] serialize(Descriptors.Descriptor
descriptor) {
.fileDescriptorSet(fileDescriptorSet)
.rootFileDescriptorName(rootFileDescriptorName).rootMessageTypeName(rootMessageTypeName).build();
schemaDataBytes =
ObjectMapperFactory.getMapperWithIncludeAlways().writer().writeValueAsBytes(schemaData);
- logger.debug("descriptor '{}' serialized to '{}'.",
descriptor.getFullName(), schemaDataBytes);
+ log.debug().attr("descriptor", descriptor.getFullName())
+ .attr("bytes", schemaDataBytes).log("descriptor
serialized");
} catch (Exception e) {
e.printStackTrace();
throw new SchemaSerializationException(e);
Review Comment:
The catch block uses `printStackTrace()`, which bypasses configured logging
and can leak to stderr. Please replace these with the structured logger (e.g.,
`log.error().exception(e)...`) and keep the thrown
`SchemaSerializationException`.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java:
##########
@@ -388,10 +387,10 @@ private void readAllExistingMessages(Reader<T> reader,
CompletableFuture<Void> f
// Reached the end
long endTime = System.nanoTime();
long durationMillis =
TimeUnit.NANOSECONDS.toMillis(endTime - startTime);
- log.info("Started table view for topic {} - Replayed {}
messages in {} seconds",
- reader.getTopic(),
- messagesRead,
- durationMillis / 1000.0);
+ log.info().attr("topic", reader.getTopic())
+ .attr("replayed", messagesRead)
+ .attr("0", durationMillis / 1000.0)
+ .log("Started table view for topic - Replayed
messages in seconds");
Review Comment:
This log event uses an attribute key "0", which looks accidental and makes
the duration hard to query/understand. Please rename it to a descriptive key
(e.g., durationSeconds/durationMillis) and adjust the log message accordingly.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java:
##########
@@ -100,10 +100,12 @@ public Message<T> beforeConsume(Consumer<T> consumer,
Message<T> message) {
interceptorMessage =
interceptors.get(i).beforeConsume(consumer, interceptorMessage);
} catch (Throwable e) {
if (consumer != null) {
- log.warn("Error executing interceptor beforeConsume
callback topic: {} consumerName: {}",
- consumer.getTopic(), consumer.getConsumerName(),
e);
+ log.warn().attr("topic", consumer.getTopic())
+ .attr("consumername", consumer.getConsumerName())
+ .exception(e)
+ .log("Error executing interceptor beforeConsume
callback topic: consumerName");
Review Comment:
Attribute name "consumername" is inconsistent with the rest of the client
logs (which use "consumerName"). Please use the same camelCase key here as well.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java:
##########
@@ -118,7 +119,10 @@ private void init(ProducerConfigurationData conf) {
try {
updateStats();
} catch (Exception e) {
- log.error("[{}] [{}]: {}", producer.getTopic(),
producer.getProducerName(), e.getMessage());
+ log.error().attr("topic", producer.getTopic())
+ .attr("producerName", producer.getProducerName())
+ .exceptionMessage(e)
+ .log("operation");
Review Comment:
This error log message is too generic ("operation") and only logs the
exception message. Please use a specific message describing what failed (e.g.,
updating producer stats) and include the exception (`.exception(e)`) so the
stack trace is available when needed.
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java:
##########
@@ -288,42 +288,37 @@ public CompletableFuture<ClientCnx>
getConnection(InetSocketAddress logicalAddre
}
private CompletableFuture<ClientCnx> createConnection(Key key) {
- if (log.isDebugEnabled()) {
- log.debug("Connection for {} not found in cache",
key.logicalAddress);
- }
+ log.debug().attr("logicalAddress",
key.logicalAddress).log("Connection for not found in cache");
Review Comment:
The debug message text is grammatically incorrect ("Connection for not found
in cache") and loses what was actually looked up. Please reword it to something
actionable (e.g., include the logicalAddress and say the connection was not
found in cache).
```suggestion
log.debug().attr("logicalAddress", key.logicalAddress)
.log("Connection was not found in cache for logical
address");
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]