This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 91c165c7ebb [improve] PIP-467: Convert Jetty request logs to 
structured slog events (#25543)
91c165c7ebb is described below

commit 91c165c7ebb95413768ee7dc8590826aa1276e14
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Apr 16 15:42:05 2026 -0700

    [improve] PIP-467: Convert Jetty request logs to structured slog events 
(#25543)
---
 .../pulsar/broker/web/JettyRequestLogFactory.java  | 187 +++++++++------------
 .../broker/web/WebServiceOriginalClientIPTest.java |  17 +-
 .../proxy/server/ProxyOriginalClientIPTest.java    |  16 +-
 3 files changed, 99 insertions(+), 121 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/JettyRequestLogFactory.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/JettyRequestLogFactory.java
index 1519147a292..1625e29f819 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/JettyRequestLogFactory.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/web/JettyRequestLogFactory.java
@@ -20,59 +20,38 @@ package org.apache.pulsar.broker.web;
 
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.util.TimeZone;
+import java.security.Principal;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import lombok.CustomLog;
+import org.eclipse.jetty.http.HttpHeader;
 import org.eclipse.jetty.io.Connection;
 import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.security.AuthenticationState;
 import org.eclipse.jetty.server.ConnectionMetaData;
 import org.eclipse.jetty.server.Connector;
-import org.eclipse.jetty.server.CustomRequestLog;
 import org.eclipse.jetty.server.ProxyConnectionFactory;
 import org.eclipse.jetty.server.Request;
 import org.eclipse.jetty.server.RequestLog;
 import org.eclipse.jetty.server.Response;
 import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.Slf4jRequestLogWriter;
 import org.eclipse.jetty.util.Attributes;
 import org.eclipse.jetty.util.HostPort;
+import org.eclipse.jetty.util.NanoTime;
 import org.eclipse.jetty.util.component.ContainerLifeCycle;
 
 /**
  * Class to standardize initialization of a Jetty request logger for all 
pulsar components.
+ *
+ * <p>Emits one structured log entry per HTTP request with individual fields 
as slog attributes
+ * (method, uri, status, clientAddr, bytesOut, durationMs, etc.) instead of 
the previous
+ * pre-formatted NCSA combined log line.
  */
+@CustomLog
 public class JettyRequestLogFactory {
 
     /**
-     * The time format to use for request logging. This custom format is 
necessary because the
-     * default option uses GMT for the time zone. Pulsar's request logging has 
historically
-     * used the JVM's default time zone, so this format uses that time zone. 
It is also necessary
-     * because the {@link CustomRequestLog#DEFAULT_DATE_FORMAT} is 
"dd/MMM/yyyy:HH:mm:ss ZZZ" instead
-     * of "dd/MMM/yyyy:HH:mm:ss Z" (the old date format). The key difference 
is that ZZZ will render
-     * the strict offset for the timezone that is unaware of daylight savings 
time while the Z will
-     * render the offset based on daylight savings time.
-     *
-     * As the javadoc for {@link CustomRequestLog} describes, the time code 
can take two arguments to
-     * configure the format and the time zone. They must be in the form: 
"%{format|timeZone}t".
-     */
-    private static final String TIME_FORMAT = String.format(" %%{%s|%s}t ",
-            "dd/MMM/yyyy:HH:mm:ss Z",
-            TimeZone.getDefault().getID());
-
-    /**
-     * This format is essentially the {@link 
CustomRequestLog#EXTENDED_NCSA_FORMAT} with three modifications:
-     *   1. The time zone will be the JVM's default time zone instead of 
always being GMT.
-     *   2. The time zone offset will be daylight savings time aware.
-     *   3. The final value will be the request time (latency) in milliseconds.
-     *
-     * See javadoc for {@link CustomRequestLog} for more information.
-     */
-    private static final String LOG_FORMAT =
-            "%{client}a - %u" + TIME_FORMAT + "\"%r\" %s %O \"%{Referer}i\" 
\"%{User-Agent}i\" %{ms}T";
-
-    /**
-     * Build a new Jetty request logger using the format defined in this class.
+     * Build a new Jetty request logger.
      * @return a request logger
      */
     public static RequestLog createRequestLogger() {
@@ -80,36 +59,39 @@ public class JettyRequestLogFactory {
     }
 
     /**
-     * Build a new Jetty request logger using the format defined in this class.
+     * Build a new Jetty request logger.
      * @param showDetailedAddresses whether to show detailed addresses and 
ports in logs
      * @return a request logger
      */
     public static RequestLog createRequestLogger(boolean 
showDetailedAddresses, Server server) {
         if (!showDetailedAddresses) {
-            return new CustomRequestLog(new Slf4jRequestLogWriter(), 
LOG_FORMAT);
+            return new StructuredRequestLog();
         } else {
             return new OriginalClientIPRequestLog(server);
         }
     }
 
     /**
-     * Logs the original and real remote (client) and local (server) IP 
addresses
-     * when detailed addresses are enabled.
-     * Tracks the real addresses of remote and local using a registered 
Connection.Listener
-     * when detailed addresses are enabled.
-     * This is necessary when Proxy Protocol is used to pass the original 
client IP.
+     * Structured request logger that emits one slog line per request with 
each field as an attribute.
+     */
+    private static class StructuredRequestLog implements RequestLog {
+        @Override
+        public void log(Request request, Response response) {
+            logRequest(request, response, null);
+        }
+    }
+
+    /**
+     * Emits the same structured attributes as {@link StructuredRequestLog} 
and additionally records
+     * the real client and server addresses when Proxy Protocol wraps the 
original endpoint.
      */
-    @CustomLog
     private static class OriginalClientIPRequestLog extends ContainerLifeCycle 
implements RequestLog {
-        private final ThreadLocal<StringBuilder> requestLogStringBuilder = 
ThreadLocal.withInitial(StringBuilder::new);
-        private final CustomRequestLog delegate;
-        private final Slf4jRequestLogWriter delegateLogWriter;
+        private final Connection.Listener 
proxyProtocolOriginalEndpointListener =
+                new ProxyProtocolOriginalEndpointListener();
+        private final ConcurrentHashMap<AddressKey, AddressEntry> 
proxyProtocolRealAddressMapping =
+                new ConcurrentHashMap<>();
 
         OriginalClientIPRequestLog(Server server) {
-            delegate = new CustomRequestLog(this::write, LOG_FORMAT);
-            addBean(delegate);
-            delegateLogWriter = new Slf4jRequestLogWriter();
-            addBean(delegateLogWriter);
             if (server != null) {
                 for (Connector connector : server.getConnectors()) {
                     // adding the listener is only necessary for connectors 
that use ProxyConnectionFactory
@@ -120,63 +102,9 @@ public class JettyRequestLogFactory {
             }
         }
 
-        void write(String requestEntry) {
-            StringBuilder sb = requestLogStringBuilder.get();
-            sb.setLength(0);
-            sb.append(requestEntry);
-        }
-
         @Override
         public void log(Request request, Response response) {
-            delegate.log(request, response);
-            StringBuilder sb = requestLogStringBuilder.get();
-            sb.append(" [R:");
-            String remoteAddr = Request.getRemoteAddr(request);
-            sb.append(remoteAddr);
-            sb.append(':');
-            int remotePort = Request.getRemotePort(request);
-            sb.append(remotePort);
-
-            InetSocketAddress realRemoteAddress =
-                    
lookupRealAddress(unwrap(request.getConnectionMetaData()).getRemoteSocketAddress());
-            if (realRemoteAddress != null) {
-                String realRemoteHost = 
HostPort.normalizeHost(realRemoteAddress.getHostString());
-                int realRemotePort = realRemoteAddress.getPort();
-                if (!realRemoteHost.equals(remoteAddr) || realRemotePort != 
remotePort) {
-                    sb.append(" via ");
-                    sb.append(realRemoteHost);
-                    sb.append(':');
-                    sb.append(realRemotePort);
-                }
-            }
-            sb.append("]->[L:");
-            InetSocketAddress realLocalAddress = 
lookupRealAddress(unwrap(request.getConnectionMetaData())
-                    .getLocalSocketAddress());
-            String localAddr = Request.getLocalAddr(request);
-            int localPort = Request.getLocalPort(request);
-            if (realLocalAddress != null) {
-                String realLocalHost = 
HostPort.normalizeHost(realLocalAddress.getHostString());
-                int realLocalPort = realLocalAddress.getPort();
-                sb.append(realLocalHost);
-                sb.append(':');
-                sb.append(realLocalPort);
-                if (!realLocalHost.equals(localAddr) || realLocalPort != 
localPort) {
-                    sb.append(" dst ");
-                    sb.append(localAddr);
-                    sb.append(':');
-                    sb.append(localPort);
-                }
-            } else {
-                sb.append(localAddr);
-                sb.append(':');
-                sb.append(localPort);
-            }
-            sb.append(']');
-            try {
-                delegateLogWriter.write(sb.toString());
-            } catch (Exception e) {
-                log.warn().exception(e).log("Failed to write request log");
-            }
+            logRequest(request, response, this);
         }
 
         private ConnectionMetaData unwrap(ConnectionMetaData 
connectionMetaData) {
@@ -203,19 +131,11 @@ public class JettyRequestLogFactory {
             }
         }
 
-        private final Connection.Listener 
proxyProtocolOriginalEndpointListener =
-                new ProxyProtocolOriginalEndpointListener();
-
-        private final ConcurrentHashMap<AddressKey, AddressEntry> 
proxyProtocolRealAddressMapping =
-                new ConcurrentHashMap<>();
-
         // Use a record as key since InetSocketAddress hash code changes if 
the address gets resolved
         record AddressKey(String hostString, int port) {
-
         }
 
         record AddressEntry(InetSocketAddress realAddress, AtomicInteger 
referenceCount) {
-
         }
 
         // Tracks the real addresses of remote and local when detailed 
addresses are enabled.
@@ -274,4 +194,53 @@ public class JettyRequestLogFactory {
             }
         }
     }
+
+    private static void logRequest(Request request, Response response, 
OriginalClientIPRequestLog addressTracker) {
+        log.info(e -> {
+            e.attr("method", request.getMethod())
+                    .attr("uri", request.getHttpURI().asString())
+                    .attr("proto", 
request.getConnectionMetaData().getProtocol())
+                    .attr("status", response.getStatus())
+                    .attr("bytesOut", 
Response.getContentBytesWritten(response))
+                    .attr("clientAddr", Request.getRemoteAddr(request))
+                    .attr("clientPort", Request.getRemotePort(request))
+                    .attr("user", authUser(request))
+                    .attr("referer", 
request.getHeaders().get(HttpHeader.REFERER))
+                    .attr("userAgent", 
request.getHeaders().get(HttpHeader.USER_AGENT))
+                    .attr("durationMs", 
NanoTime.millisSince(request.getBeginNanoTime()));
+
+            if (addressTracker != null) {
+                String clientAddr = Request.getRemoteAddr(request);
+                int clientPort = Request.getRemotePort(request);
+                String localAddr = Request.getLocalAddr(request);
+                int localPort = Request.getLocalPort(request);
+                e.attr("localAddr", localAddr).attr("localPort", localPort);
+
+                ConnectionMetaData md = 
addressTracker.unwrap(request.getConnectionMetaData());
+                InetSocketAddress realRemote = 
addressTracker.lookupRealAddress(md.getRemoteSocketAddress());
+                if (realRemote != null) {
+                    String host = 
HostPort.normalizeHost(realRemote.getHostString());
+                    int port = realRemote.getPort();
+                    if (!host.equals(clientAddr) || port != clientPort) {
+                        e.attr("clientAddrReal", host).attr("clientPortReal", 
port);
+                    }
+                }
+                InetSocketAddress realLocal = 
addressTracker.lookupRealAddress(md.getLocalSocketAddress());
+                if (realLocal != null) {
+                    String host = 
HostPort.normalizeHost(realLocal.getHostString());
+                    int port = realLocal.getPort();
+                    if (!host.equals(localAddr) || port != localPort) {
+                        e.attr("localAddrReal", host).attr("localPortReal", 
port);
+                    }
+                }
+            }
+
+            e.log("HTTP request");
+        });
+    }
+
+    private static String authUser(Request request) {
+        Principal principal = AuthenticationState.getUserPrincipal(request);
+        return principal != null ? principal.getName() : null;
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceOriginalClientIPTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceOriginalClientIPTest.java
index b8d60d84361..adedda82160 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceOriginalClientIPTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/WebServiceOriginalClientIPTest.java
@@ -91,7 +91,10 @@ public class WebServiceOriginalClientIPTest extends 
MockedPulsarServiceBaseTest
 
             // Validate that the client IP passed in X-Forwarded-For is logged
             assertTrue(consoleCaptor.getStandardOutput().stream()
-                    .anyMatch(line -> line.contains("RequestLog") && 
line.contains("[R:11.22.33.44:12345 via ")));
+                    .anyMatch(line -> line.contains("HTTP request")
+                            && line.contains("clientAddr=11.22.33.44")
+                            && line.contains("clientPort=12345")
+                            && line.contains("clientAddrReal=")));
         });
     }
 
@@ -110,7 +113,10 @@ public class WebServiceOriginalClientIPTest extends 
MockedPulsarServiceBaseTest
 
             // Validate that the client IP passed in Forwarded is logged
             assertTrue(consoleCaptor.getStandardOutput().stream()
-                    .anyMatch(line -> line.contains("RequestLog") && 
line.contains("[R:11.22.33.44:12345 via ")));
+                    .anyMatch(line -> line.contains("HTTP request")
+                            && line.contains("clientAddr=11.22.33.44")
+                            && line.contains("clientPort=12345")
+                            && line.contains("clientAddrReal=")));
         });
     }
 
@@ -135,8 +141,11 @@ public class WebServiceOriginalClientIPTest extends 
MockedPulsarServiceBaseTest
 
             // Validate that the client IP and destination IP passed in HA 
Proxy protocol is logged
             assertTrue(consoleCaptor.getStandardOutput().stream()
-                    .anyMatch(line -> line.contains("RequestLog") && 
line.contains("[R:99.22.33.44:1234 via ")
-                            && line.contains(" dst 5.4.3.1:4321]")));
+                    .anyMatch(line -> line.contains("HTTP request")
+                            && line.contains("clientAddr=99.22.33.44")
+                            && line.contains("clientPort=1234")
+                            && line.contains("localAddr=5.4.3.1")
+                            && line.contains("localPort=4321")));
         });
     }
 
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyOriginalClientIPTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyOriginalClientIPTest.java
index 5ae2bf26380..d5a396883b3 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyOriginalClientIPTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyOriginalClientIPTest.java
@@ -109,11 +109,11 @@ public class ProxyOriginalClientIPTest extends 
MockedPulsarServiceBaseTest {
 
             // Validate that the client IP passed in X-Forwarded-For is logged
             assertTrue(consoleCaptor.getStandardOutput().stream()
-                    .anyMatch(line -> line.contains("pulsar-external-web-") && 
line.contains("RequestLog")
-                            && line.contains("R:11.22.33.44")), "Expected to 
find client IP in proxy logs");
+                    .anyMatch(line -> line.contains("pulsar-external-web-") && 
line.contains("HTTP request")
+                            && line.contains("clientAddr=11.22.33.44")), 
"Expected to find client IP in proxy logs");
             assertTrue(consoleCaptor.getStandardOutput().stream()
-                    .anyMatch(line -> line.contains("pulsar-web-") && 
line.contains("RequestLog")
-                            && line.contains("R:11.22.33.44")), "Expected to 
find client IP in broker logs");
+                    .anyMatch(line -> line.contains("pulsar-web-") && 
line.contains("HTTP request")
+                            && line.contains("clientAddr=11.22.33.44")), 
"Expected to find client IP in broker logs");
         });
     }
 
@@ -132,11 +132,11 @@ public class ProxyOriginalClientIPTest extends 
MockedPulsarServiceBaseTest {
 
             // Validate that the client IP passed in HA proxy protocol is 
logged
             assertTrue(consoleCaptor.getStandardOutput().stream()
-                    .anyMatch(line -> line.contains("pulsar-external-web-") && 
line.contains("RequestLog")
-                            && line.contains("R:99.22.33.44")), "Expected to 
find client IP in proxy logs");
+                    .anyMatch(line -> line.contains("pulsar-external-web-") && 
line.contains("HTTP request")
+                            && line.contains("clientAddr=99.22.33.44")), 
"Expected to find client IP in proxy logs");
             assertTrue(consoleCaptor.getStandardOutput().stream()
-                    .anyMatch(line -> line.contains("pulsar-web-") && 
line.contains("RequestLog")
-                            && line.contains("R:99.22.33.44")), "Expected to 
find client IP in broker logs");
+                    .anyMatch(line -> line.contains("pulsar-web-") && 
line.contains("HTTP request")
+                            && line.contains("clientAddr=99.22.33.44")), 
"Expected to find client IP in broker logs");
         });
     }
 

Reply via email to