Copilot commented on code in PR #25507:
URL: https://github.com/apache/pulsar/pull/25507#discussion_r3070578923


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtils.java:
##########
@@ -115,7 +116,8 @@ public static Descriptors.Descriptor deserialize(byte[] 
schemaDataBytes) {
             for (int i = 1; i < paths.length; i++) {
                 descriptor = descriptor.findNestedTypeByName(paths[i]);
             }
-            logger.debug("deserialize '{}' to descriptor: '{}'.", 
schemaDataBytes, descriptor.getFullName());
+            log.debug().attr("bytes", schemaDataBytes)
+                    .attr("descriptor", 
descriptor.getFullName()).log("deserialized to descriptor");
         } catch (Exception e) {
             e.printStackTrace();
             throw new SchemaSerializationException(e);

Review Comment:
   `printStackTrace()` is used here as well; please replace with structured 
logging (`log.error().exception(e)...`) instead of writing directly to stderr.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java:
##########
@@ -66,10 +64,12 @@ public Message<T> onArrival(Consumer<T> consumer, 
Message<T> message) {
                 interceptorMessage = interceptors.get(i).onArrival(consumer, 
interceptorMessage);
             } catch (Throwable e) {
                 if (consumer != null) {
-                    log.warn("Error executing interceptor beforeConsume 
callback topic: {} consumerName: {}",
-                            consumer.getTopic(), consumer.getConsumerName(), 
e);
+                    log.warn().attr("topic", consumer.getTopic())
+                            .attr("consumername", consumer.getConsumerName())

Review Comment:
   Attribute name "consumername" is inconsistent with the rest of the client 
logs (which use "consumerName"). This breaks standardized attribute querying; 
please use the same camelCase key here.
   ```suggestion
                               .attr("consumerName", consumer.getConsumerName())
   ```



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java:
##########
@@ -150,20 +152,25 @@ private void init(ConsumerConfigurationData<?> conf) {
                 int prefetchQueueSize = consumerImpl.incomingMessages.size();
                 if ((currentNumMsgsReceived | currentNumBytesReceived | 
currentNumReceiveFailed | currentNumAcksSent
                         | currentNumAcksFailed | prefetchQueueSize) != 0) {
-                    log.info(
-                            "[{}] [{}] [{}] Prefetched messages: {} --- "
-                                    + "Consume throughput received: {} msgs/s 
--- {} Mbit/s --- "
-                                    + "Ack sent rate: {} ack/s --- " + "Failed 
messages: {} --- batch messages: {} ---"
-                                    + "Failed acks: {}",
-                            consumerImpl.getTopic(), 
consumerImpl.getSubscription(), consumerImpl.consumerName,
-                            prefetchQueueSize, 
THROUGHPUT_FORMAT.format(receivedMsgsRate),
-                            THROUGHPUT_FORMAT.format(receivedBytesRate * 8 / 
1024 / 1024),
-                            THROUGHPUT_FORMAT.format(currentNumAcksSent / 
elapsed), currentNumReceiveFailed,
-                            currentNumBatchReceiveFailed, 
currentNumAcksFailed);
+                    log.info().attr("topic", consumerImpl.getTopic())
+                            .attr("subscription", 
consumerImpl.getSubscription())
+                            .attr("consumerName", consumerImpl.consumerName)
+                            .attr("prefetchedMessages", prefetchQueueSize)
+                            .attr("receivedMsgRate", 
THROUGHPUT_FORMAT.format(receivedMsgsRate))
+                            .attr("receivedBytesRateMbps",
+                                    THROUGHPUT_FORMAT.format(receivedBytesRate 
* 8 / 1024 / 1024))
+                            .attr("ackSentRate", 
THROUGHPUT_FORMAT.format(currentNumAcksSent / elapsed))
+                            .attr("failedMessages", currentNumReceiveFailed)
+                            .attr("failedBatchMessages", 
currentNumBatchReceiveFailed)
+                            .attr("failedAcks", currentNumAcksFailed)
+                            .log("Consumer stats");
                 }
             } catch (Exception e) {
-                log.error("[{}] [{}] [{}]: {}", consumerImpl.getTopic(), 
consumerImpl.subscription
-                        , consumerImpl.consumerName, e.getMessage());
+                log.error().attr("topic", consumerImpl.getTopic())
+                        .attr("subscription", consumerImpl.subscription)
+                        .attr("consumerName", consumerImpl.consumerName)
+                        .exceptionMessage(e)
+                        .log("operation");

Review Comment:
   This error log message is too generic ("operation") and only logs the 
exception message. Please use a specific message (e.g., updating consumer 
stats) and include the exception (`.exception(e)`) so the stack trace is 
available when needed.
   ```suggestion
                           .exception(e)
                           .log("Failed to update consumer stats");
   ```



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/ProtobufNativeSchemaUtils.java:
##########
@@ -58,7 +58,8 @@ public static byte[] serialize(Descriptors.Descriptor 
descriptor) {
                     .fileDescriptorSet(fileDescriptorSet)
                     
.rootFileDescriptorName(rootFileDescriptorName).rootMessageTypeName(rootMessageTypeName).build();
             schemaDataBytes = 
ObjectMapperFactory.getMapperWithIncludeAlways().writer().writeValueAsBytes(schemaData);
-            logger.debug("descriptor '{}' serialized to '{}'.", 
descriptor.getFullName(), schemaDataBytes);
+            log.debug().attr("descriptor", descriptor.getFullName())
+                    .attr("bytes", schemaDataBytes).log("descriptor 
serialized");
         } catch (Exception e) {
             e.printStackTrace();
             throw new SchemaSerializationException(e);

Review Comment:
   The catch block uses `printStackTrace()`, which bypasses configured logging 
and can leak to stderr. Please replace these with the structured logger (e.g., 
`log.error().exception(e)...`) and keep the thrown 
`SchemaSerializationException`.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java:
##########
@@ -388,10 +387,10 @@ private void readAllExistingMessages(Reader<T> reader, 
CompletableFuture<Void> f
                        // Reached the end
                        long endTime = System.nanoTime();
                        long durationMillis = 
TimeUnit.NANOSECONDS.toMillis(endTime - startTime);
-                       log.info("Started table view for topic {} - Replayed {} 
messages in {} seconds",
-                               reader.getTopic(),
-                               messagesRead,
-                               durationMillis / 1000.0);
+                       log.info().attr("topic", reader.getTopic())
+                               .attr("replayed", messagesRead)
+                               .attr("0", durationMillis / 1000.0)
+                               .log("Started table view for topic - Replayed 
messages in seconds");

Review Comment:
   This log event uses an attribute key "0", which looks accidental and makes 
the duration hard to query/understand. Please rename it to a descriptive key 
(e.g., durationSeconds/durationMillis) and adjust the log message accordingly.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java:
##########
@@ -100,10 +100,12 @@ public Message<T> beforeConsume(Consumer<T> consumer, 
Message<T> message) {
                 interceptorMessage = 
interceptors.get(i).beforeConsume(consumer, interceptorMessage);
             } catch (Throwable e) {
                 if (consumer != null) {
-                    log.warn("Error executing interceptor beforeConsume 
callback topic: {} consumerName: {}",
-                            consumer.getTopic(), consumer.getConsumerName(), 
e);
+                    log.warn().attr("topic", consumer.getTopic())
+                            .attr("consumername", consumer.getConsumerName())
+                            .exception(e)
+                            .log("Error executing interceptor beforeConsume 
callback topic: consumerName");

Review Comment:
   Attribute name "consumername" is inconsistent with the rest of the client 
logs (which use "consumerName"). Please use the same camelCase key here as well.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java:
##########
@@ -118,7 +119,10 @@ private void init(ProducerConfigurationData conf) {
             try {
                 updateStats();
             } catch (Exception e) {
-                log.error("[{}] [{}]: {}", producer.getTopic(), 
producer.getProducerName(), e.getMessage());
+                log.error().attr("topic", producer.getTopic())
+                        .attr("producerName", producer.getProducerName())
+                        .exceptionMessage(e)
+                        .log("operation");

Review Comment:
   This error log message is too generic ("operation") and only logs the 
exception message. Please use a specific message describing what failed (e.g., 
updating producer stats) and include the exception (`.exception(e)`) so the 
stack trace is available when needed.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java:
##########
@@ -288,42 +288,37 @@ public CompletableFuture<ClientCnx> 
getConnection(InetSocketAddress logicalAddre
     }
 
     private CompletableFuture<ClientCnx> createConnection(Key key) {
-        if (log.isDebugEnabled()) {
-            log.debug("Connection for {} not found in cache", 
key.logicalAddress);
-        }
+            log.debug().attr("logicalAddress", 
key.logicalAddress).log("Connection for not found in cache");

Review Comment:
   The debug message text is grammatically incorrect ("Connection for not found 
in cache") and loses what was actually looked up. Please reword it to something 
actionable (e.g., include the logicalAddress and say the connection was not 
found in cache).
   ```suggestion
               log.debug().attr("logicalAddress", key.logicalAddress)
                       .log("Connection was not found in cache for logical 
address");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to