Hello,

I want to contribute a custom SocketFactory allowing to analyze the
utilization of acceptConnection attribute of a Connector. In a properly
configured production system, there should be rare situations where
connections wait for a worker thread to be handled. Our client complained on
high latency of web requests, but the measurement on servlet did not show
high latency. So we wanted to know the number of connections which wait in
socket backlog and were not accepted yet.
I solved this problem by writing a custom SocketFactory, which accepts
connections immediately and puts it in my queue until a call to accept()
will take them. So the number of waiting connections can be monitored via
JMX.

To activate this factory, the declaration of the corresponding Connector in
server.xml should be changed like in the following example.

 <Connector port="8080" maxHttpHeaderSize="8192"
               maxThreads="10" minSpareThreads="5" maxSpareThreads="7"
               enableLookups="false" redirectPort="8443" acceptCount="10"
               connectionTimeout="2000" disableUploadTimeout="true"

               socketFactory="
org.apache.tomcat.util.net.BacklogMeasuringServerSocketFactory"/>


No changes in existing classes are required.

Please review the code in the attachment.


Andrew Skiba.
package org.apache.tomcat.util.net;

import java.io.*;
import java.net.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.MBeanRegistration;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.commons.modeler.Registry;

/**
 * Default server socket factory. Doesn't do much except give us
 * plain ol' server sockets.
 *
 * @author [EMAIL PROTECTED]
 * @author Harish Prabandham
 */

// Default implementation of server sockets.

//
// WARNING: Some of the APIs in this class are used by J2EE. 
// Please talk to [EMAIL PROTECTED] before making any changes.
//
public class BacklogMeasuringServerSocketFactory extends ServerSocketFactory
    implements MBeanRegistration
{
    private static org.apache.commons.logging.Log logger =
        org.apache.commons.logging.LogFactory.getLog(BacklogMeasuringServerSocketFactory.class);
    private static AtomicLong queuesCommonSize = new AtomicLong(0);

    static class ServerSocketProxy extends ServerSocket {
        public static final int DEFAULT_QUEUE_SIZE = 50;

        BlockingQueue<Socket> acceptQueue;
        Thread acceptThread;

        class AcceptRunnable implements Runnable {
            public void run() {
                try {
                    while (!isClosed()) {
                        acceptQueue.put(superAccept());
                        queuesCommonSize.incrementAndGet();
                    }
                } catch (InterruptedException ex) {
                    logger.warn("unexpected exception", ex);
                } catch (IOException ex) {
                    logger.info("stopping accepting connections", ex);
                }
            }
        }

        private Socket superAccept () throws IOException {
            return super.accept();
        }
        void init (int queueSize) {
            acceptQueue = new LinkedBlockingQueue (queueSize);
            acceptThread = new Thread(new AcceptRunnable(), "Accept-"+toString());
            acceptThread.start();                    
        }
        
        public ServerSocketProxy(int port) throws IOException {
            super(port);
            init(DEFAULT_QUEUE_SIZE);
        }
        
        public ServerSocketProxy(int port, int backlog) throws IOException {
            super(port, backlog);
            init(backlog);
        }
        
        public ServerSocketProxy(int port, int backlog, InetAddress bindAddr) throws IOException {
            super(port, backlog, bindAddr);
            init(backlog);
        }

        @Override
        public Socket accept() throws IOException {
            try {
                Socket res = acceptQueue.take();
                queuesCommonSize.decrementAndGet();
                return res;
            } catch (InterruptedException ex) {
                throw new SocketException ("unexpected InterruptedException");
            }
        }
    }
    
    public BacklogMeasuringServerSocketFactory () {
        try {
            Registry.getRegistry(null, null).registerComponent(this,
                    new ObjectName("measure:type=Backlog,obj="+hashCode()), null);
        } catch (Exception ex) {
            logger.error("MBean was not registered", ex);
        }
    }

    public ServerSocket createSocket (int port)
    throws IOException {
        return new ServerSocketProxy(port);
    }

    public ServerSocket createSocket (int port, int backlog)
    throws IOException {
        return new ServerSocketProxy (port, backlog);
    }

    public ServerSocket createSocket (int port, int backlog,
        InetAddress ifAddress)
    throws IOException {
        return new ServerSocketProxy (port, backlog, ifAddress);
    }
 
    public Socket acceptSocket(ServerSocket socket)
 	throws IOException {
        return socket.accept();
    }
 
    public void handshake(Socket sock)
 	throws IOException {
 	// NOOP
    }
    
    public long size () {
        return queuesCommonSize.longValue();
    }

    
    protected String domain;
    protected ObjectName oname;
    protected MBeanServer mserver;

    public ObjectName getObjectName() {
        return oname;
    }

    public String getDomain() {
        return domain;
    }


    public ObjectName preRegister(MBeanServer server, ObjectName name) throws Exception {
        this.oname = name;
        this.mserver = server;
        domain = name.getDomain();
        return name;
    }

    public void postRegister(Boolean registrationDone) {
    }

    public void preDeregister() throws Exception {
    }

    public void postDeregister() {
    }    
 }
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to