Piotr Kliczewski has uploaded a new change for review.

Change subject: events: use separate queue for events
......................................................................

events: use separate queue for events


Change-Id: I2ff65e274d03a8059130fbd9baca1320a6524aa3
Signed-off-by: pkliczewski <piotr.kliczew...@gmail.com>
---
M 
backend/manager/modules/common/src/main/java/org/ovirt/engine/core/common/config/ConfigValues.java
M 
backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/TransportFactory.java
M 
backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/VdsManager.java
M 
backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/irsbroker/IrsProxyData.java
M 
backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcUtils.java
M 
backend/manager/modules/vdsbroker/src/test/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcIntegrationTest.java
M packaging/dbscripts/upgrade/pre_upgrade/0000_config.sql
7 files changed, 49 insertions(+), 8 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/ovirt-engine refs/changes/96/41796/1

diff --git 
a/backend/manager/modules/common/src/main/java/org/ovirt/engine/core/common/config/ConfigValues.java
 
b/backend/manager/modules/common/src/main/java/org/ovirt/engine/core/common/config/ConfigValues.java
index 5260923..37ffc11 100644
--- 
a/backend/manager/modules/common/src/main/java/org/ovirt/engine/core/common/config/ConfigValues.java
+++ 
b/backend/manager/modules/common/src/main/java/org/ovirt/engine/core/common/config/ConfigValues.java
@@ -403,6 +403,9 @@
     @TypeConverterAttribute(String.class)
     @DefaultValueAttribute("jms.queue.irsreponses")
     IrsResponseQueueName,
+    @TypeConverterAttribute(String.class)
+    @DefaultValueAttribute("jms.queue.events")
+    EventQueueName,
     @TypeConverterAttribute(Integer.class)
     @DefaultValueAttribute("10")
     EventProcessingPoolSize,
diff --git 
a/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/TransportFactory.java
 
b/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/TransportFactory.java
index f7dac73..259ec43 100644
--- 
a/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/TransportFactory.java
+++ 
b/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/TransportFactory.java
@@ -1,10 +1,12 @@
 package org.ovirt.engine.core.vdsbroker;
 
 import org.apache.commons.httpclient.HttpClient;
+import org.ovirt.engine.core.common.FeatureSupported;
 import org.ovirt.engine.core.common.businessentities.VdsProtocol;
 import org.ovirt.engine.core.common.config.Config;
 import org.ovirt.engine.core.common.config.ConfigValues;
 import org.ovirt.engine.core.common.utils.Pair;
+import org.ovirt.engine.core.compat.Version;
 import org.ovirt.engine.core.vdsbroker.irsbroker.IIrsServer;
 import org.ovirt.engine.core.vdsbroker.irsbroker.IrsServerConnector;
 import org.ovirt.engine.core.vdsbroker.irsbroker.IrsServerWrapper;
@@ -17,17 +19,22 @@
 import org.ovirt.engine.core.vdsbroker.xmlrpc.XmlRpcUtils;
 
 public class TransportFactory {
-    public static IIrsServer createIrsServer(VdsProtocol vdsProtocol, String 
hostname, int port, int clientTimeOut,
+    public static IIrsServer createIrsServer(VdsProtocol vdsProtocol, Version 
version, String hostname, int port, int clientTimeOut,
             int connectionTimeOut, int clientRetries, int heartbeat) {
         IIrsServer irsServer = null;
         if (VdsProtocol.STOMP == vdsProtocol) {
+            String eventQueue = null;
+            if (FeatureSupported.vmStatsEvents(version)) {
+                eventQueue = Config.<String> 
getValue(ConfigValues.EventQueueName);
+            }
             irsServer = new 
JsonRpcIIrsServer(JsonRpcUtils.createStompClient(hostname,
                     port, connectionTimeOut, clientTimeOut, clientRetries, 
heartbeat,
                     Config.<Boolean> 
getValue(ConfigValues.EncryptHostCommunication),
                     Config.<String> getValue(ConfigValues.VdsmSSLProtocol),
                     Config.<Integer> 
getValue(ConfigValues.EventProcessingPoolSize),
                     Config.<String> getValue(ConfigValues.IrsRequestQueueName),
-                    Config.<String> 
getValue(ConfigValues.IrsResponseQueueName)));
+                    Config.<String> 
getValue(ConfigValues.IrsResponseQueueName),
+                    eventQueue));
         } else if (VdsProtocol.XML == vdsProtocol){
             Pair<IrsServerConnector, HttpClient> returnValue =
                     XmlRpcUtils.getConnection(hostname, port, clientTimeOut, 
connectionTimeOut,
@@ -40,7 +47,7 @@
         return irsServer;
     }
 
-    public static IVdsServer createVdsServer(VdsProtocol vdsProtocol, String 
hostname, int port, int clientTimeOut,
+    public static IVdsServer createVdsServer(VdsProtocol vdsProtocol, Version 
version, String hostname, int port, int clientTimeOut,
             int connectionTimeOut, int clientRetries, int heartbeat) {
         IVdsServer vdsServer = null;
         Pair<VdsServerConnector, HttpClient> returnValue =
@@ -52,13 +59,18 @@
                         Config.<Boolean> 
getValue(ConfigValues.EncryptHostCommunication));
 
         if (VdsProtocol.STOMP == vdsProtocol) {
+            String eventQueue = null;
+            if (FeatureSupported.vmStatsEvents(version)) {
+                eventQueue = Config.<String> 
getValue(ConfigValues.EventQueueName);
+            }
             vdsServer = new 
JsonRpcVdsServer(JsonRpcUtils.createStompClient(hostname,
                     port, connectionTimeOut, clientTimeOut, clientRetries, 
heartbeat,
                     Config.<Boolean> 
getValue(ConfigValues.EncryptHostCommunication),
                     Config.<String> getValue(ConfigValues.VdsmSSLProtocol),
                     Config.<Integer> 
getValue(ConfigValues.EventProcessingPoolSize),
                     Config.<String> getValue(ConfigValues.VdsRequestQueueName),
-                    Config.<String> 
getValue(ConfigValues.VdsResponseQueueName)),
+                    Config.<String> 
getValue(ConfigValues.VdsResponseQueueName),
+                    eventQueue),
                     returnValue.getSecond());
         } else if (VdsProtocol.XML == vdsProtocol) {
             vdsServer = new VdsServerWrapper(returnValue.getFirst(), 
returnValue.getSecond());
diff --git 
a/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/VdsManager.java
 
b/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/VdsManager.java
index 3e83164..1038029 100644
--- 
a/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/VdsManager.java
+++ 
b/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/VdsManager.java
@@ -200,6 +200,7 @@
         int clientRetries = Config.<Integer> getValue(ConfigValues.vdsRetries);
         vdsProxy = TransportFactory.createVdsServer(
                 cachedVds.getProtocol(),
+                cachedVds.getVdsGroupCompatibilityVersion(),
                 cachedVds.getHostName(),
                 cachedVds.getPort(),
                 clientTimeOut,
diff --git 
a/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/irsbroker/IrsProxyData.java
 
b/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/irsbroker/IrsProxyData.java
index 3551ab7..1dae7b6 100644
--- 
a/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/irsbroker/IrsProxyData.java
+++ 
b/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/irsbroker/IrsProxyData.java
@@ -60,6 +60,7 @@
 import org.ovirt.engine.core.compat.KeyValuePairCompat;
 import org.ovirt.engine.core.compat.RefObject;
 import org.ovirt.engine.core.compat.TransactionScopeOption;
+import org.ovirt.engine.core.compat.Version;
 import org.ovirt.engine.core.dal.dbbroker.DbFacade;
 import org.ovirt.engine.core.dal.dbbroker.auditloghandling.AuditLogDirector;
 import org.ovirt.engine.core.dal.dbbroker.auditloghandling.AuditLogableBase;
@@ -136,6 +137,16 @@
 
     private void setProtocol(VdsProtocol value) {
         this.privateProtocol = value;
+    }
+
+    private Version privateVersion;
+
+    private Version getVersion() {
+        return this.privateVersion;
+    }
+
+    private void setVersion(Version version) {
+        this.privateVersion = version;
     }
 
     public Guid getFencedIrs() {
@@ -537,6 +548,7 @@
         setmIrsPort(vds.getPort());
         privatemCurrentIrsHost = vds.getHostName();
         setProtocol(vds.getProtocol());
+        setVersion(vds.getVdsGroupCompatibilityVersion());
     }
 
     public boolean failover() {
@@ -598,7 +610,14 @@
                     int connectionTimeOut = Config.<Integer> 
getValue(ConfigValues.vdsConnectionTimeout) * 1000;
                     int heartbeat = Config.<Integer> 
getValue(ConfigValues.vdsHeartbeatInSeconds) * 1000;
                     int clientRetries = Config.<Integer> 
getValue(ConfigValues.vdsRetries);
-                    irsProxy = TransportFactory.createIrsServer(getProtocol(), 
host, getmIrsPort(), clientTimeOut, connectionTimeOut, clientRetries, 
heartbeat);
+                    irsProxy = TransportFactory.createIrsServer(getProtocol(),
+                                    getVersion(),
+                                    host,
+                                    getmIrsPort(),
+                                    clientTimeOut,
+                                    connectionTimeOut,
+                                    clientRetries,
+                                    heartbeat);
                     runStoragePoolUpEvent(storagePool);
                 }
             }
diff --git 
a/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcUtils.java
 
b/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcUtils.java
index c125692..265d024 100644
--- 
a/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcUtils.java
+++ 
b/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcUtils.java
@@ -28,14 +28,17 @@
             String protocol,
             int parallelism,
             String requestQueue,
-            String responseQueue) {
-        ClientPolicy connectionPolicy =
+            String responseQueue,
+            String eventQueue) {
+        StompClientPolicy connectionPolicy =
                 new StompClientPolicy(connectionTimeout,
                         connectionRetry,
                         heartbeat,
                         IOException.class,
                         requestQueue,
                         responseQueue);
+        connectionPolicy.setEventQueue(eventQueue);
+
         ClientPolicy clientPolicy = new ClientPolicy(clientTimeout, 
connectionRetry, heartbeat, IOException.class);
         return createClient(hostname, port, connectionPolicy, clientPolicy, 
isSecure, ReactorType.STOMP, protocol, parallelism);
     }
diff --git 
a/backend/manager/modules/vdsbroker/src/test/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcIntegrationTest.java
 
b/backend/manager/modules/vdsbroker/src/test/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcIntegrationTest.java
index 6f1c327e..914b3bc 100644
--- 
a/backend/manager/modules/vdsbroker/src/test/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcIntegrationTest.java
+++ 
b/backend/manager/modules/vdsbroker/src/test/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcIntegrationTest.java
@@ -26,6 +26,7 @@
     private final static int TIMEOUT = 5000;
     private final static String DEFAULT_REQUEST_QUEUE = "jms.queue.requests";
     private final static String DEFAULT_RESPONSE_QUEUE = "jms.queue.reponses";
+    private final static String DEFAULT_EVENTS_QUEUE = "jms.queue.events";
 
     @Test
     public void testGetVdsCapabilities() throws InterruptedException, 
ExecutionException, ClientConnectionException {
@@ -37,7 +38,8 @@
                 "TLSv1",
                 Runtime.getRuntime().availableProcessors(),
                 DEFAULT_REQUEST_QUEUE,
-                DEFAULT_RESPONSE_QUEUE);
+                DEFAULT_RESPONSE_QUEUE,
+                DEFAULT_EVENTS_QUEUE);
         final JsonRpcRequest request = new 
RequestBuilder("Host.getCapabilities").build();
         Map<String, Object> map = new FutureMap(client, request);
         assertTrue(map.isEmpty());
diff --git a/packaging/dbscripts/upgrade/pre_upgrade/0000_config.sql 
b/packaging/dbscripts/upgrade/pre_upgrade/0000_config.sql
index 64fb059..0319f82 100644
--- a/packaging/dbscripts/upgrade/pre_upgrade/0000_config.sql
+++ b/packaging/dbscripts/upgrade/pre_upgrade/0000_config.sql
@@ -533,6 +533,7 @@
 select 
fn_db_add_config_value('VdsResponseQueueName','jms.topic.vdsm_responses','general');
 select 
fn_db_add_config_value('IrsRequestQueueName','jms.topic.vdsm_irs_requests','general');
 select 
fn_db_add_config_value('IrsResponseQueueName','jms.topic.vdsm_irs_responses','general');
+select fn_db_add_config_value('EventQueueName','jms.queue.events','3.5');
 select fn_db_add_config_value('EventProcessingPoolSize','10','general');
 select 
fn_db_add_config_value('TimeToReduceFailedRunOnVdsInMinutes','30','general');
 select fn_db_add_config_value('UnknownTaskPrePollingLapse','60000','general');


-- 
To view, visit https://gerrit.ovirt.org/41796
To unsubscribe, visit https://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I2ff65e274d03a8059130fbd9baca1320a6524aa3
Gerrit-PatchSet: 1
Gerrit-Project: ovirt-engine
Gerrit-Branch: master
Gerrit-Owner: Piotr Kliczewski <piotr.kliczew...@gmail.com>
_______________________________________________
Engine-patches mailing list
Engine-patches@ovirt.org
http://lists.ovirt.org/mailman/listinfo/engine-patches

Reply via email to