Author: tv
Date: Sun Jan 15 18:01:19 2017
New Revision: 1778931
URL: http://svn.apache.org/viewvc?rev=1778931&view=rev
Log:
Derive CacheEventQueue from the pooled implementation, just use pool of size 1
Modified:
commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/CacheEventQueue.java
commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/PooledCacheEventQueue.java
Modified:
commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/CacheEventQueue.java
URL:
http://svn.apache.org/viewvc/commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/CacheEventQueue.java?rev=1778931&r1=1778930&r2=1778931&view=diff
==============================================================================
---
commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/CacheEventQueue.java
(original)
+++
commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/CacheEventQueue.java
Sun Jan 15 18:01:19 2017
@@ -1,9 +1,5 @@
package org.apache.commons.jcs.engine;
-import java.util.ArrayList;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -23,36 +19,22 @@ import java.util.concurrent.TimeUnit;
* under the License.
*/
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
import org.apache.commons.jcs.engine.behavior.ICacheListener;
-import org.apache.commons.jcs.engine.stats.StatElement;
-import org.apache.commons.jcs.engine.stats.Stats;
-import org.apache.commons.jcs.engine.stats.behavior.IStatElement;
-import org.apache.commons.jcs.engine.stats.behavior.IStats;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.commons.jcs.utils.threadpool.DaemonThreadFactory;
/**
* An event queue is used to propagate ordered cache events to one and only
one target listener.
- * <p>
- * This is a modified version of the experimental version. It should lazy
initialize the processor
- * thread, and kill the thread if the queue goes empty for a specified period,
now set to 1 minute.
- * If something comes in after that a new processor thread should be created.
*/
public class CacheEventQueue<K, V>
- extends AbstractCacheEventQueue<K, V>
+ extends PooledCacheEventQueue<K, V>
{
- /** The logger. */
- private static final Log log = LogFactory.getLog( CacheEventQueue.class );
-
/** The type of queue -- there are pooled and single */
private static final QueueType queueType = QueueType.SINGLE;
- /** the thread that works the queue. */
- private Thread processorThread;
-
- /** Queue implementation */
- private LinkedBlockingQueue<AbstractCacheEvent> queue = new
LinkedBlockingQueue<AbstractCacheEvent>();
-
/**
* Constructs with the specified listener and the cache name.
* <p>
@@ -77,205 +59,46 @@ public class CacheEventQueue<K, V>
public CacheEventQueue( ICacheListener<K, V> listener, long listenerId,
String cacheName, int maxFailure,
int waitBeforeRetry )
{
- initialize( listener, listenerId, cacheName, maxFailure,
waitBeforeRetry );
- }
-
- /**
- * What type of queue is this.
- * <p>
- * @return queueType
- */
- @Override
- public QueueType getQueueType()
- {
- return queueType;
- }
-
- /**
- * Kill the processor thread and indicate that the queue is destroyed and
no longer alive, but it
- * can still be working.
- */
- protected void stopProcessing()
- {
- setAlive(false);
- processorThread = null;
- }
-
- /**
- * Event Q is empty.
- * <p>
- * Calling destroy interrupts the processor thread.
- */
- @Override
- public void destroy()
- {
- if ( isAlive() )
- {
- setAlive(false);
-
- if ( log.isInfoEnabled() )
- {
- log.info( "Destroying queue, stats = " + getStatistics() );
- }
-
- if ( processorThread != null )
- {
- processorThread.interrupt();
- processorThread = null;
- }
-
- if ( log.isInfoEnabled() )
- {
- log.info( "Cache event queue destroyed: " + this );
- }
- }
- else
- {
- if ( log.isInfoEnabled() )
- {
- log.info( "Destroy was called after queue was destroyed. Doing
nothing. Stats = " + getStatistics() );
- }
- }
- }
-
- /**
- * Adds an event to the queue.
- * <p>
- * @param event
- */
- @Override
- protected void put( AbstractCacheEvent event )
- {
- if ( log.isDebugEnabled() )
- {
- log.debug( "Event entering Queue for " + getCacheName() + ": " +
event );
- }
-
- queue.offer(event);
-
- if ( isWorking() )
- {
- if ( !isAlive() )
- {
- setAlive(true);
- processorThread = new QProcessor();
- processorThread.start();
- if ( log.isInfoEnabled() )
- {
- log.info( "Cache event queue created: " + this );
- }
- }
- }
+ super( listener, listenerId, cacheName, maxFailure, waitBeforeRetry,
null );
}
- // /////////////////////////// Inner classes /////////////////////////////
-
/**
- * This is the thread that works the queue.
+ * Initializes the queue.
* <p>
- * @author asmuts
- * @created January 15, 2002
- */
- protected class QProcessor
- extends Thread
- {
- /**
- * Constructor for the QProcessor object
- * <p>
- * @param aQueue the event queue to take items from.
- */
- QProcessor()
- {
- super( "CacheEventQueue.QProcessor-" + getCacheName() );
- setDaemon( true );
- }
-
- /**
- * Main processing method for the QProcessor object.
- * <p>
- * Waits for a specified time (waitToDieMillis) for something to come
in and if no new
- * events come in during that period the run method can exit and the
thread is dereferenced.
- */
- @Override
- public void run()
- {
-
- while ( CacheEventQueue.this.isAlive() )
- {
- AbstractCacheEvent event = null;
-
- try
- {
- event = queue.poll(getWaitToDieMillis(),
TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException e)
- {
- // is ok
- }
-
- if ( log.isDebugEnabled() )
- {
- log.debug( "Event from queue = " + event );
- }
-
- if ( event == null )
- {
- stopProcessing();
- }
-
- if ( event != null && isWorking() &&
CacheEventQueue.this.isAlive() )
- {
- event.run();
- }
- }
- if ( log.isDebugEnabled() )
- {
- log.debug( "QProcessor exiting for " + getCacheName() );
- }
- }
- }
-
- /**
- * This method returns semi-structured data on this queue.
- * <p>
- * @see
org.apache.commons.jcs.engine.behavior.ICacheEventQueue#getStatistics()
- * @return information on the status and history of the queue
+ * @param listener
+ * @param listenerId
+ * @param cacheName
+ * @param maxFailure
+ * @param waitBeforeRetry
+ * @param threadPoolName
*/
@Override
- public IStats getStatistics()
+ protected void initialize( ICacheListener<K, V> listener, long listenerId,
String cacheName, int maxFailure,
+ int waitBeforeRetry, String threadPoolName )
{
- IStats stats = new Stats();
- stats.setTypeName( "Cache Event Queue" );
-
- ArrayList<IStatElement<?>> elems = new ArrayList<IStatElement<?>>();
-
- elems.add(new StatElement<Boolean>( "Working",
Boolean.valueOf(this.isWorking()) ) );
- elems.add(new StatElement<Boolean>( "Alive",
Boolean.valueOf(this.isAlive()) ) );
- elems.add(new StatElement<Boolean>( "Empty",
Boolean.valueOf(this.isEmpty()) ) );
- elems.add(new StatElement<Integer>( "Size",
Integer.valueOf(this.size()) ) );
-
- stats.setStatElements( elems );
+ super.initialize(listener, listenerId, cacheName, maxFailure,
waitBeforeRetry);
- return stats;
- }
+ // create a default pool with one worker thread to mimic the SINGLE
queue behavior
+ LinkedBlockingQueue<Runnable> queue = new
LinkedBlockingQueue<Runnable>();
- /**
- * @return whether there are any items in the queue.
- */
- @Override
- public boolean isEmpty()
- {
- return queue.isEmpty();
+ pool = new ThreadPoolExecutor(
+ 0,
+ 1,
+ getWaitToDieMillis(),
+ TimeUnit.MILLISECONDS,
+ queue,
+ new DaemonThreadFactory("CacheEventQueue.QProcessor-" +
getCacheName()));
+ setAlive(true);
}
/**
- * Returns the number of elements in the queue.
+ * What type of queue is this.
* <p>
- * @return number of items in the queue.
+ * @return queueType
*/
@Override
- public int size()
+ public QueueType getQueueType()
{
- return queue.size();
+ return queueType;
}
}
Modified:
commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/PooledCacheEventQueue.java
URL:
http://svn.apache.org/viewvc/commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/PooledCacheEventQueue.java?rev=1778931&r1=1778930&r2=1778931&view=diff
==============================================================================
---
commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/PooledCacheEventQueue.java
(original)
+++
commons/proper/jcs/trunk/commons-jcs-core/src/main/java/org/apache/commons/jcs/engine/PooledCacheEventQueue.java
Sun Jan 15 18:01:19 2017
@@ -52,7 +52,7 @@ public class PooledCacheEventQueue<K, V>
private static final QueueType queueType = QueueType.POOLED;
/** The Thread Pool to execute events with. */
- private ThreadPoolExecutor pool = null;
+ protected ThreadPoolExecutor pool = null;
/**
* Constructor for the CacheEventQueue object
@@ -88,6 +88,7 @@ public class PooledCacheEventQueue<K, V>
// this will share the same pool with other event queues by default.
pool = ThreadPoolManager.getInstance().getPool(
(threadPoolName == null) ? "cache_event_queue" :
threadPoolName );
+ setAlive(true);
}
/**