Author: tv
Date: Sun Dec 18 18:19:37 2016
New Revision: 1774925

URL: http://svn.apache.org/viewvc?rev=1774925&view=rev
Log:
Clean up API
Use LinkedBlockingQueue instead of home-grown implementation

Modified:
    
commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/AbstractCacheEventQueue.java
    
commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/CacheEventQueue.java
    
commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/PooledCacheEventQueue.java

Modified: 
commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/AbstractCacheEventQueue.java
URL: 
http://svn.apache.org/viewvc/commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/AbstractCacheEventQueue.java?rev=1774925&r1=1774924&r2=1774925&view=diff
==============================================================================
--- 
commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/AbstractCacheEventQueue.java
 (original)
+++ 
commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/AbstractCacheEventQueue.java
 Sun Dec 18 18:19:37 2016
@@ -1,5 +1,8 @@
 package org.apache.commons.jcs.engine;
 
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -25,8 +28,6 @@ import org.apache.commons.jcs.engine.beh
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.io.IOException;
-
 /**
  * An abstract base class to the different implementations
  */
@@ -45,34 +46,32 @@ public abstract class AbstractCacheEvent
      */
     private int waitToDieMillis = DEFAULT_WAIT_TO_DIE_MILLIS;
 
-    // TODO privatise the fields
-
     /**
-     * When the events are pulled off the queue, the tell the listener to 
handle the specific event
+     * When the events are pulled off the queue, then tell the listener to 
handle the specific event
      * type. The work is done by the listener.
      */
-    protected ICacheListener<K, V> listener;
+    private ICacheListener<K, V> listener;
 
     /** Id of the listener registered with this queue */
-    protected long listenerId;
+    private long listenerId;
 
     /** The cache region name, if applicable. */
-    protected String cacheName;
+    private String cacheName;
 
     /** Maximum number of failures before we buy the farm. */
-    protected int maxFailure;
+    private int maxFailure;
 
     /** in milliseconds */
-    protected int waitBeforeRetry;
+    private int waitBeforeRetry;
 
-    /** this is true if there is no worker thread. */
-    protected boolean destroyed = true;
+    /** this is true if there is any worker thread. */
+    private final AtomicBoolean alive = new AtomicBoolean(false);
 
     /**
      * This means that the queue is functional. If we reached the max number 
of failures, the queue
      * is marked as non functional and will never work again.
      */
-    private boolean working = true;
+    private final AtomicBoolean working = new AtomicBoolean(true);
 
     /**
      * Returns the time to wait for events before killing the background 
thread.
@@ -111,9 +110,9 @@ public abstract class AbstractCacheEvent
      * @return The alive value
      */
     @Override
-    public synchronized boolean isAlive()
+    public boolean isAlive()
     {
-        return !destroyed;
+        return alive.get();
     }
 
     /**
@@ -121,9 +120,9 @@ public abstract class AbstractCacheEvent
      * <p>
      * @param aState
      */
-    public synchronized void setAlive( boolean aState )
+    public void setAlive( boolean aState )
     {
-        destroyed = !aState;
+        alive.set(aState);
     }
 
     /**
@@ -136,6 +135,43 @@ public abstract class AbstractCacheEvent
     }
 
     /**
+     * @return the cacheName
+     */
+    protected String getCacheName()
+    {
+        return cacheName;
+    }
+
+    /**
+     * Initializes the queue.
+     * <p>
+     * @param listener
+     * @param listenerId
+     * @param cacheName
+     * @param maxFailure
+     * @param waitBeforeRetry
+     */
+    protected void initialize( ICacheListener<K, V> listener, long listenerId, 
String cacheName, int maxFailure,
+                            int waitBeforeRetry)
+    {
+        if ( listener == null )
+        {
+            throw new IllegalArgumentException( "listener must not be null" );
+        }
+
+        this.listener = listener;
+        this.listenerId = listenerId;
+        this.cacheName = cacheName;
+        this.maxFailure = maxFailure <= 0 ? 3 : maxFailure;
+        this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry;
+
+        if ( log.isDebugEnabled() )
+        {
+            log.debug( "Constructed: " + this );
+        }
+    }
+
+    /**
      * This adds a put event to the queue. When it is processed, the element 
will be put to the
      * listener.
      * <p>
@@ -150,12 +186,9 @@ public abstract class AbstractCacheEvent
         {
             put( new PutEvent( ce ) );
         }
-        else
+        else if ( log.isWarnEnabled() )
         {
-            if ( log.isWarnEnabled() )
-            {
-                log.warn( "Not enqueuing Put Event for [" + this + "] because 
it's non-functional." );
-            }
+            log.warn( "Not enqueuing Put Event for [" + this + "] because it's 
non-functional." );
         }
     }
 
@@ -174,12 +207,9 @@ public abstract class AbstractCacheEvent
         {
             put( new RemoveEvent( key ) );
         }
-        else
+        else if ( log.isWarnEnabled() )
         {
-            if ( log.isWarnEnabled() )
-            {
-                log.warn( "Not enqueuing Remove Event for [" + this + "] 
because it's non-functional." );
-            }
+            log.warn( "Not enqueuing Remove Event for [" + this + "] because 
it's non-functional." );
         }
     }
 
@@ -197,12 +227,9 @@ public abstract class AbstractCacheEvent
         {
             put( new RemoveAllEvent() );
         }
-        else
+        else if ( log.isWarnEnabled() )
         {
-            if ( log.isWarnEnabled() )
-            {
-                log.warn( "Not enqueuing RemoveAll Event for [" + this + "] 
because it's non-functional." );
-            }
+            log.warn( "Not enqueuing RemoveAll Event for [" + this + "] 
because it's non-functional." );
         }
     }
 
@@ -217,12 +244,9 @@ public abstract class AbstractCacheEvent
         {
             put( new DisposeEvent() );
         }
-        else
+        else if ( log.isWarnEnabled() )
         {
-            if ( log.isWarnEnabled() )
-            {
-                log.warn( "Not enqueuing Dispose Event for [" + this + "] 
because it's non-functional." );
-            }
+            log.warn( "Not enqueuing Dispose Event for [" + this + "] because 
it's non-functional." );
         }
     }
 
@@ -235,24 +259,12 @@ public abstract class AbstractCacheEvent
 
 
     // /////////////////////////// Inner classes /////////////////////////////
-
-    /** The queue is composed of nodes. */
-    protected static class Node
-    {
-        /** Next node in the singly linked list. */
-        Node next = null;
-
-        /** The payload. */
-        AbstractCacheEventQueue<?, ?>.AbstractCacheEvent event = null;
-    }
-
     /**
      * Retries before declaring failure.
      * <p>
      * @author asmuts
      */
-    protected abstract class AbstractCacheEvent
-        implements Runnable
+    protected abstract class AbstractCacheEvent implements Runnable
     {
         /** Number of failures encountered processing this event. */
         int failures = 0;
@@ -260,8 +272,8 @@ public abstract class AbstractCacheEvent
         /**
          * Main processing method for the AbstractCacheEvent object
          */
-        @SuppressWarnings("synthetic-access")
         @Override
+        @SuppressWarnings("synthetic-access")
         public void run()
         {
             try
@@ -442,7 +454,6 @@ public abstract class AbstractCacheEvent
         {
             return "RemoveAllEvent";
         }
-
     }
 
     /**
@@ -483,7 +494,7 @@ public abstract class AbstractCacheEvent
     @Override
     public boolean isWorking()
     {
-        return working;
+        return working.get();
     }
 
     /**
@@ -494,6 +505,6 @@ public abstract class AbstractCacheEvent
      */
     public void setWorking( boolean b )
     {
-        working = b;
+        working.set(b);
     }
 }

Modified: 
commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/CacheEventQueue.java
URL: 
http://svn.apache.org/viewvc/commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/CacheEventQueue.java?rev=1774925&r1=1774924&r2=1774925&view=diff
==============================================================================
--- 
commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/CacheEventQueue.java
 (original)
+++ 
commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/CacheEventQueue.java
 Sun Dec 18 18:19:37 2016
@@ -1,5 +1,9 @@
 package org.apache.commons.jcs.engine;
 
+import java.util.ArrayList;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -27,8 +31,6 @@ import org.apache.commons.jcs.engine.sta
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.util.ArrayList;
-
 /**
  * An event queue is used to propagate ordered cache events to one and only 
one target listener.
  * <p>
@@ -48,17 +50,8 @@ public class CacheEventQueue<K, V>
     /** the thread that works the queue. */
     private Thread processorThread;
 
-    /** sync */
-    private final Object queueLock = new Object();
-
-    /** the head of the queue */
-    private Node head = new Node();
-
-    /** the end of the queue */
-    private Node tail = head;
-
-    /** Number of items in the queue */
-    private int size = 0;
+    /** Queue implementation */
+    private LinkedBlockingQueue<AbstractCacheEvent> queue = new 
LinkedBlockingQueue<AbstractCacheEvent>();
 
     /**
      * Constructs with the specified listener and the cache name.
@@ -84,38 +77,7 @@ public class CacheEventQueue<K, V>
     public CacheEventQueue( ICacheListener<K, V> listener, long listenerId, 
String cacheName, int maxFailure,
                             int waitBeforeRetry )
     {
-        initialize( listener, listenerId, cacheName, maxFailure, 
waitBeforeRetry, null );
-    }
-
-    /**
-     * Initializes the queue.
-     * <p>
-     * @param listener
-     * @param listenerId
-     * @param cacheName
-     * @param maxFailure
-     * @param waitBeforeRetry
-     * @param threadPoolName
-     */
-    @Override
-    public void initialize( ICacheListener<K, V> listener, long listenerId, 
String cacheName, int maxFailure,
-                            int waitBeforeRetry, String threadPoolName )
-    {
-        if ( listener == null )
-        {
-            throw new IllegalArgumentException( "listener must not be null" );
-        }
-
-        this.listener = listener;
-        this.listenerId = listenerId;
-        this.cacheName = cacheName;
-        this.maxFailure = maxFailure <= 0 ? 3 : maxFailure;
-        this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry;
-
-        if ( log.isDebugEnabled() )
-        {
-            log.debug( "Constructed: " + this );
-        }
+        initialize( listener, listenerId, cacheName, maxFailure, 
waitBeforeRetry );
     }
 
     /**
@@ -133,13 +95,10 @@ public class CacheEventQueue<K, V>
      * Kill the processor thread and indicate that the queue is destroyed and 
no longer alive, but it
      * can still be working.
      */
-    public void stopProcessing()
+    protected void stopProcessing()
     {
-        synchronized (queueLock)
-        {
-            destroyed = true;
-            processorThread = null;
-        }
+        setAlive(false);
+        processorThread = null;
     }
 
     /**
@@ -150,37 +109,31 @@ public class CacheEventQueue<K, V>
     @Override
     public void destroy()
     {
-        synchronized (queueLock)
+        if ( isAlive() )
         {
-            if ( !destroyed )
-            {
-                destroyed = true;
+            setAlive(false);
 
-                if ( log.isInfoEnabled() )
-                {
-                    log.info( "Destroying queue, stats =  " + getStatistics() 
);
-                }
-
-                // Synchronize on queue so the thread will not wait forever,
-                // and then interrupt the QueueProcessor
+            if ( log.isInfoEnabled() )
+            {
+                log.info( "Destroying queue, stats =  " + getStatistics() );
+            }
 
-                if ( processorThread != null )
-                {
-                    processorThread.interrupt();
-                    processorThread = null;
-                }
+            if ( processorThread != null )
+            {
+                processorThread.interrupt();
+                processorThread = null;
+            }
 
-                if ( log.isInfoEnabled() )
-                {
-                    log.info( "Cache event queue destroyed: " + this );
-                }
+            if ( log.isInfoEnabled() )
+            {
+                log.info( "Cache event queue destroyed: " + this );
             }
-            else
+        }
+        else
+        {
+            if ( log.isInfoEnabled() )
             {
-                if ( log.isInfoEnabled() )
-                {
-                    log.info( "Destroy was called after queue was destroyed.  
Doing nothing.  Stats =  " + getStatistics() );
-                }
+                log.info( "Destroy was called after queue was destroyed. Doing 
nothing. Stats =  " + getStatistics() );
             }
         }
     }
@@ -193,34 +146,23 @@ public class CacheEventQueue<K, V>
     @Override
     protected void put( AbstractCacheEvent event )
     {
-        Node newNode = new Node();
         if ( log.isDebugEnabled() )
         {
-            log.debug( "Event entering Queue for " + cacheName + ": " + event 
);
+            log.debug( "Event entering Queue for " + getCacheName() + ": " + 
event );
         }
 
-        newNode.event = event;
+        queue.offer(event);
 
-        synchronized ( queueLock )
+        if ( isWorking() )
         {
-            size++;
-            tail.next = newNode;
-            tail = newNode;
-            if ( isWorking() )
+            if ( !isAlive() )
             {
-                if ( !isAlive() )
-                {
-                    destroyed = false;
-                    processorThread = new QProcessor( this );
-                    processorThread.start();
-                    if ( log.isInfoEnabled() )
-                    {
-                        log.info( "Cache event queue created: " + this );
-                    }
-                }
-                else
+                setAlive(true);
+                processorThread = new QProcessor();
+                processorThread.start();
+                if ( log.isInfoEnabled() )
                 {
-                    queueLock.notify();
+                    log.info( "Cache event queue created: " + this );
                 }
             }
         }
@@ -234,23 +176,18 @@ public class CacheEventQueue<K, V>
      * @author asmuts
      * @created January 15, 2002
      */
-    private class QProcessor
+    protected class QProcessor
         extends Thread
     {
-        /** The queue to work */
-        CacheEventQueue<K, V> queue;
-
         /**
          * Constructor for the QProcessor object
          * <p>
          * @param aQueue the event queue to take items from.
          */
-        QProcessor( CacheEventQueue<K, V> aQueue )
+        QProcessor()
         {
-            super( "CacheEventQueue.QProcessor-" + aQueue.cacheName );
-
+            super( "CacheEventQueue.QProcessor-" + getCacheName() );
             setDaemon( true );
-            queue = aQueue;
         }
 
         /**
@@ -259,15 +196,22 @@ public class CacheEventQueue<K, V>
          * Waits for a specified time (waitToDieMillis) for something to come 
in and if no new
          * events come in during that period the run method can exit and the 
thread is dereferenced.
          */
-        @SuppressWarnings("synthetic-access")
         @Override
         public void run()
         {
-            AbstractCacheEvent event = null;
 
-            while ( queue.isAlive() )
+            while ( isAlive() )
             {
-                event = queue.take();
+                AbstractCacheEvent event = null;
+
+                try
+                {
+                    event = queue.poll(getWaitToDieMillis(), 
TimeUnit.MILLISECONDS);
+                }
+                catch (InterruptedException e)
+                {
+                    // is ok
+                }
 
                 if ( log.isDebugEnabled() )
                 {
@@ -276,79 +220,18 @@ public class CacheEventQueue<K, V>
 
                 if ( event == null )
                 {
-                    synchronized ( queueLock )
-                    {
-                        try
-                        {
-                            queueLock.wait( queue.getWaitToDieMillis() );
-                        }
-                        catch ( InterruptedException e )
-                        {
-                            log.warn( "Interrupted while waiting for another 
event to come in before we die." );
-                            return;
-                        }
-                        event = queue.take();
-                        if ( log.isDebugEnabled() )
-                        {
-                            log.debug( "Event from queue after sleep = " + 
event );
-                        }
-                    }
-                    if ( event == null )
-                    {
-                        queue.stopProcessing();
-                    }
+                    stopProcessing();
                 }
 
-                if ( queue.isWorking() && queue.isAlive() && event != null )
+                if ( event != null && isWorking() && isAlive() )
                 {
                     event.run();
                 }
             }
             if ( log.isDebugEnabled() )
             {
-                log.debug( "QProcessor exiting for " + queue );
-            }
-        }
-    }
-
-    /**
-     * Returns the next cache event from the queue or null if there are no 
events in the queue.
-     * <p>
-     * We have an empty node at the head and the tail. When we take an item 
from the queue we move
-     * the next node to the head and then clear the value from that node. This 
value is returned.
-     * <p>
-     * When the queue is empty the head node is the same as the tail node.
-     * <p>
-     * @return An event to process.
-     */
-    protected AbstractCacheEvent take()
-    {
-        synchronized ( queueLock )
-        {
-            // wait until there is something to read
-            if ( head == tail )
-            {
-                return null;
-            }
-
-            Node node = head.next;
-
-            @SuppressWarnings("unchecked") // No generics for public fields
-            AbstractCacheEvent value = (AbstractCacheEvent) node.event;
-
-            if ( log.isDebugEnabled() )
-            {
-                log.debug( "head.event = " + head.event );
-                log.debug( "node.event = " + node.event );
+                log.debug( "QProcessor exiting for " + getCacheName() );
             }
-
-            // Node becomes the new head (head is always empty)
-
-            node.event = null;
-            head = node;
-
-            size--;
-            return value;
         }
     }
 
@@ -366,30 +249,10 @@ public class CacheEventQueue<K, V>
 
         ArrayList<IStatElement<?>> elems = new ArrayList<IStatElement<?>>();
 
-        elems.add(new StatElement<Boolean>( "Working", 
Boolean.valueOf(super.isWorking()) ) );
+        elems.add(new StatElement<Boolean>( "Working", 
Boolean.valueOf(this.isWorking()) ) );
         elems.add(new StatElement<Boolean>( "Alive", 
Boolean.valueOf(this.isAlive()) ) );
         elems.add(new StatElement<Boolean>( "Empty", 
Boolean.valueOf(this.isEmpty()) ) );
-
-        int sz = 0;
-        synchronized ( queueLock )
-        {
-            // wait until there is something to read
-            if ( head == tail )
-            {
-                sz = 0;
-            }
-            else
-            {
-                Node n = head;
-                while ( n != null )
-                {
-                    n = n.next;
-                    sz++;
-                }
-            }
-
-            elems.add(new StatElement<Integer>( "Size", Integer.valueOf(sz) ) 
);
-        }
+        elems.add(new StatElement<Integer>( "Size", 
Integer.valueOf(this.size()) ) );
 
         stats.setStatElements( elems );
 
@@ -402,7 +265,7 @@ public class CacheEventQueue<K, V>
     @Override
     public boolean isEmpty()
     {
-        return tail == head;
+        return queue.isEmpty();
     }
 
     /**
@@ -413,6 +276,6 @@ public class CacheEventQueue<K, V>
     @Override
     public int size()
     {
-        return size;
+        return queue.size();
     }
 }

Modified: 
commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/PooledCacheEventQueue.java
URL: 
http://svn.apache.org/viewvc/commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/PooledCacheEventQueue.java?rev=1774925&r1=1774924&r2=1774925&view=diff
==============================================================================
--- 
commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/PooledCacheEventQueue.java
 (original)
+++ 
commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/PooledCacheEventQueue.java
 Sun Dec 18 18:19:37 2016
@@ -1,5 +1,9 @@
 package org.apache.commons.jcs.engine;
 
+import java.util.ArrayList;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -28,10 +32,6 @@ import org.apache.commons.jcs.utils.thre
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import java.util.ArrayList;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-
 /**
  * An event queue is used to propagate ordered cache events to one and only 
one target listener.
  * <p>
@@ -80,29 +80,14 @@ public class PooledCacheEventQueue<K, V>
      * @param waitBeforeRetry
      * @param threadPoolName
      */
-    @Override
-    public void initialize( ICacheListener<K, V> listener, long listenerId, 
String cacheName, int maxFailure,
+    protected void initialize( ICacheListener<K, V> listener, long listenerId, 
String cacheName, int maxFailure,
                             int waitBeforeRetry, String threadPoolName )
     {
-        if ( listener == null )
-        {
-            throw new IllegalArgumentException( "listener must not be null" );
-        }
-
-        this.listener = listener;
-        this.listenerId = listenerId;
-        this.cacheName = cacheName;
-        this.maxFailure = maxFailure <= 0 ? 3 : maxFailure;
-        this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry;
+        super.initialize(listener, listenerId, cacheName, maxFailure, 
waitBeforeRetry);
 
         // this will share the same pool with other event queues by default.
         pool = ThreadPoolManager.getInstance().getPool(
                 (threadPoolName == null) ? "cache_event_queue" : 
threadPoolName );
-
-        if ( log.isDebugEnabled() )
-        {
-            log.debug( "Initialized: " + this );
-        }
     }
 
     /**
@@ -115,22 +100,14 @@ public class PooledCacheEventQueue<K, V>
     }
 
     /**
-     * Event Q is empty.
-     */
-    public synchronized void stopProcessing()
-    {
-        destroyed = true;
-    }
-
-    /**
      * Destroy the queue. Interrupt all threads.
      */
     @Override
     public synchronized void destroy()
     {
-        if ( !destroyed )
+        if ( isAlive() )
         {
-            destroyed = true;
+            setAlive(false);
             pool.shutdownNow();
             if ( log.isInfoEnabled() )
             {
@@ -151,14 +128,6 @@ public class PooledCacheEventQueue<K, V>
     }
 
     /**
-     * @return Statistics info
-     */
-    public String getStats()
-    {
-        return getStatistics().toString();
-    }
-
-    /**
      * @return IStats
      */
     @Override


Reply via email to