Piotr Kliczewski has uploaded a new change for review.

Change subject: events: make event processing pool size configurable
......................................................................

events: make event processing pool size configurable


Change-Id: Ib4e71d1767180bb22fc8129ccec9ed967cf4a8ad
Signed-off-by: pkliczewski <piotr.kliczew...@gmail.com>
---
M 
backend/manager/modules/common/src/main/java/org/ovirt/engine/core/common/config/ConfigValues.java
M 
backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/ResourceManager.java
M 
backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/TransportFactory.java
M 
backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcUtils.java
M packaging/dbscripts/upgrade/pre_upgrade/0000_config.sql
5 files changed, 20 insertions(+), 8 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/ovirt-engine refs/changes/93/39093/1

diff --git 
a/backend/manager/modules/common/src/main/java/org/ovirt/engine/core/common/config/ConfigValues.java
 
b/backend/manager/modules/common/src/main/java/org/ovirt/engine/core/common/config/ConfigValues.java
index 892f47b..30dd9df 100644
--- 
a/backend/manager/modules/common/src/main/java/org/ovirt/engine/core/common/config/ConfigValues.java
+++ 
b/backend/manager/modules/common/src/main/java/org/ovirt/engine/core/common/config/ConfigValues.java
@@ -404,6 +404,9 @@
     @TypeConverterAttribute(String.class)
     @DefaultValueAttribute("jms.queue.irsreponses")
     IrsResponseQueueName,
+    @TypeConverterAttribute(Integer.class)
+    @DefaultValueAttribute("10")
+    EventProcessingPoolSize,
     @Reloadable
     @TypeConverterAttribute(String.class)
     @DefaultValueAttribute("oVirt")
diff --git 
a/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/ResourceManager.java
 
b/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/ResourceManager.java
index 0bedfe0..f5775ea 100644
--- 
a/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/ResourceManager.java
+++ 
b/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/ResourceManager.java
@@ -34,6 +34,8 @@
 import org.ovirt.engine.core.common.businessentities.network.NetworkStatistics;
 import 
org.ovirt.engine.core.common.businessentities.network.VmNetworkInterface;
 import 
org.ovirt.engine.core.common.businessentities.network.VmNetworkStatistics;
+import org.ovirt.engine.core.common.config.Config;
+import org.ovirt.engine.core.common.config.ConfigValues;
 import org.ovirt.engine.core.common.interfaces.FutureVDSCall;
 import org.ovirt.engine.core.common.qualifiers.VmDeleted;
 import org.ovirt.engine.core.common.vdscommands.FutureVDSCommandType;
@@ -72,6 +74,7 @@
     private static final String VDSCommandPrefix = "VDSCommand";
 
     private static final Logger log = 
LoggerFactory.getLogger(ResourceManager.class);
+    private static final int PARALLELISM = Config.<Integer> 
getValue(ConfigValues.EventProcessingPoolSize);
 
     @Inject
     private AuditLogDirector auditLogDirector;
@@ -497,6 +500,6 @@
     }
 
     public void subscribe(EventSubscriber subscriber) {
-        ReactorFactory.getWorker().getPublisher().subscribe(subscriber);
+        
ReactorFactory.getWorker(PARALLELISM).getPublisher().subscribe(subscriber);
     }
 }
diff --git 
a/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/TransportFactory.java
 
b/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/TransportFactory.java
index f496f33..f7dac73 100644
--- 
a/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/TransportFactory.java
+++ 
b/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/TransportFactory.java
@@ -25,6 +25,7 @@
                     port, connectionTimeOut, clientTimeOut, clientRetries, 
heartbeat,
                     Config.<Boolean> 
getValue(ConfigValues.EncryptHostCommunication),
                     Config.<String> getValue(ConfigValues.VdsmSSLProtocol),
+                    Config.<Integer> 
getValue(ConfigValues.EventProcessingPoolSize),
                     Config.<String> getValue(ConfigValues.IrsRequestQueueName),
                     Config.<String> 
getValue(ConfigValues.IrsResponseQueueName)));
         } else if (VdsProtocol.XML == vdsProtocol){
@@ -55,6 +56,7 @@
                     port, connectionTimeOut, clientTimeOut, clientRetries, 
heartbeat,
                     Config.<Boolean> 
getValue(ConfigValues.EncryptHostCommunication),
                     Config.<String> getValue(ConfigValues.VdsmSSLProtocol),
+                    Config.<Integer> 
getValue(ConfigValues.EventProcessingPoolSize),
                     Config.<String> getValue(ConfigValues.VdsRequestQueueName),
                     Config.<String> 
getValue(ConfigValues.VdsResponseQueueName)),
                     returnValue.getSecond());
diff --git 
a/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcUtils.java
 
b/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcUtils.java
index 57a7cec..fc1bf33 100644
--- 
a/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcUtils.java
+++ 
b/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcUtils.java
@@ -26,6 +26,7 @@
             int heartbeat,
             boolean isSecure,
             String protocol,
+            int parallelism,
             String requestQueue,
             String responseQueue) {
         ClientPolicy connectionPolicy =
@@ -36,7 +37,7 @@
                         requestQueue,
                         responseQueue);
         ClientPolicy clientPolicy = new ClientPolicy(clientTimeout, 
connectionRetry, heartbeat, IOException.class);
-        return createClient(hostname, port, connectionPolicy, clientPolicy, 
isSecure, ReactorType.STOMP, protocol);
+        return createClient(hostname, port, connectionPolicy, clientPolicy, 
isSecure, ReactorType.STOMP, protocol, parallelism);
     }
 
     public static JsonRpcClient createClient(String hostname,
@@ -47,11 +48,12 @@
             int heartbeat,
             boolean isSecure,
             ReactorType type,
-            String protocol) {
+            String protocol,
+            int parallelism) {
         ClientPolicy connectionPolicy =
                 new ClientPolicy(connectionTimeout, connectionRetry, 
heartbeat, IOException.class);
         ClientPolicy clientPolicy = new ClientPolicy(clientTimeout, 
connectionRetry, heartbeat, IOException.class);
-        return createClient(hostname, port, connectionPolicy, clientPolicy, 
isSecure, type, protocol);
+        return createClient(hostname, port, connectionPolicy, clientPolicy, 
isSecure, type, protocol, parallelism);
     }
 
     private static JsonRpcClient createClient(String hostname,
@@ -60,14 +62,15 @@
             ClientPolicy clientPolicy,
             boolean isSecure,
             ReactorType type,
-            String protocol) {
+            String protocol,
+            int parallelism) {
         ManagerProvider provider = null;
         if (isSecure) {
             provider = new EngineManagerProvider(protocol);
         }
         try {
             final Reactor reactor = ReactorFactory.getReactor(provider, type);
-            return getJsonClient(reactor, hostname, port, connectionPolicy, 
clientPolicy);
+            return getJsonClient(reactor, hostname, port, connectionPolicy, 
clientPolicy, parallelism);
         } catch (ClientConnectionException e) {
             log.error("Exception occured during building ssl context or 
obtaining selector", e);
             throw new IllegalStateException(e);
@@ -75,10 +78,10 @@
     }
 
     private static JsonRpcClient getJsonClient(Reactor reactor, String 
hostName, int port, ClientPolicy
-            connectionPolicy, ClientPolicy clientPolicy) throws 
ClientConnectionException {
+            connectionPolicy, ClientPolicy clientPolicy, int parallelism) 
throws ClientConnectionException {
         final ReactorClient client = reactor.createClient(hostName, port);
         client.setClientPolicy(connectionPolicy);
-        ResponseWorker worker = ReactorFactory.getWorker();
+        ResponseWorker worker = ReactorFactory.getWorker(parallelism);
         JsonRpcClient jsonClient = worker.register(client);
         jsonClient.setRetryPolicy(clientPolicy);
         return jsonClient;
diff --git a/packaging/dbscripts/upgrade/pre_upgrade/0000_config.sql 
b/packaging/dbscripts/upgrade/pre_upgrade/0000_config.sql
index 85e8d62..96876e4 100644
--- a/packaging/dbscripts/upgrade/pre_upgrade/0000_config.sql
+++ b/packaging/dbscripts/upgrade/pre_upgrade/0000_config.sql
@@ -500,6 +500,7 @@
 select 
fn_db_add_config_value('VdsResponseQueueName','jms.topic.vdsm_responses','general');
 select 
fn_db_add_config_value('IrsRequestQueueName','jms.topic.vdsm_irs_requests','general');
 select 
fn_db_add_config_value('IrsResponseQueueName','jms.topic.vdsm_irs_responses','general');
+select fn_db_add_config_value('EventProcessingPoolSize','10','general');
 select 
fn_db_add_config_value('TimeToReduceFailedRunOnVdsInMinutes','30','general');
 select fn_db_add_config_value('UnknownTaskPrePollingLapse','60000','general');
 select fn_db_add_config_value('UserSessionHardLimit','600','general');


-- 
To view, visit https://gerrit.ovirt.org/39093
To unsubscribe, visit https://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ib4e71d1767180bb22fc8129ccec9ed967cf4a8ad
Gerrit-PatchSet: 1
Gerrit-Project: ovirt-engine
Gerrit-Branch: master
Gerrit-Owner: Piotr Kliczewski <piotr.kliczew...@gmail.com>
_______________________________________________
Engine-patches mailing list
Engine-patches@ovirt.org
http://lists.ovirt.org/mailman/listinfo/engine-patches

Reply via email to