Piotr Kliczewski has uploaded a new change for review. Change subject: events: use separate queue for events ......................................................................
events: use separate queue for events Change-Id: I2ff65e274d03a8059130fbd9baca1320a6524aa3 Signed-off-by: pkliczewski <piotr.kliczew...@gmail.com> --- M backend/manager/modules/common/src/main/java/org/ovirt/engine/core/common/config/ConfigValues.java M backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/TransportFactory.java M backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/VdsManager.java M backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/irsbroker/IrsProxyData.java M backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcUtils.java M backend/manager/modules/vdsbroker/src/test/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcIntegrationTest.java M packaging/dbscripts/upgrade/pre_upgrade/0000_config.sql 7 files changed, 49 insertions(+), 8 deletions(-) git pull ssh://gerrit.ovirt.org:29418/ovirt-engine refs/changes/96/41796/1 diff --git a/backend/manager/modules/common/src/main/java/org/ovirt/engine/core/common/config/ConfigValues.java b/backend/manager/modules/common/src/main/java/org/ovirt/engine/core/common/config/ConfigValues.java index 5260923..37ffc11 100644 --- a/backend/manager/modules/common/src/main/java/org/ovirt/engine/core/common/config/ConfigValues.java +++ b/backend/manager/modules/common/src/main/java/org/ovirt/engine/core/common/config/ConfigValues.java @@ -403,6 +403,9 @@ @TypeConverterAttribute(String.class) @DefaultValueAttribute("jms.queue.irsreponses") IrsResponseQueueName, + @TypeConverterAttribute(String.class) + @DefaultValueAttribute("jms.queue.events") + EventQueueName, @TypeConverterAttribute(Integer.class) @DefaultValueAttribute("10") EventProcessingPoolSize, diff --git a/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/TransportFactory.java b/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/TransportFactory.java index f7dac73..259ec43 100644 --- a/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/TransportFactory.java +++ b/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/TransportFactory.java @@ -1,10 +1,12 @@ package org.ovirt.engine.core.vdsbroker; import org.apache.commons.httpclient.HttpClient; +import org.ovirt.engine.core.common.FeatureSupported; import org.ovirt.engine.core.common.businessentities.VdsProtocol; import org.ovirt.engine.core.common.config.Config; import org.ovirt.engine.core.common.config.ConfigValues; import org.ovirt.engine.core.common.utils.Pair; +import org.ovirt.engine.core.compat.Version; import org.ovirt.engine.core.vdsbroker.irsbroker.IIrsServer; import org.ovirt.engine.core.vdsbroker.irsbroker.IrsServerConnector; import org.ovirt.engine.core.vdsbroker.irsbroker.IrsServerWrapper; @@ -17,17 +19,22 @@ import org.ovirt.engine.core.vdsbroker.xmlrpc.XmlRpcUtils; public class TransportFactory { - public static IIrsServer createIrsServer(VdsProtocol vdsProtocol, String hostname, int port, int clientTimeOut, + public static IIrsServer createIrsServer(VdsProtocol vdsProtocol, Version version, String hostname, int port, int clientTimeOut, int connectionTimeOut, int clientRetries, int heartbeat) { IIrsServer irsServer = null; if (VdsProtocol.STOMP == vdsProtocol) { + String eventQueue = null; + if (FeatureSupported.vmStatsEvents(version)) { + eventQueue = Config.<String> getValue(ConfigValues.EventQueueName); + } irsServer = new JsonRpcIIrsServer(JsonRpcUtils.createStompClient(hostname, port, connectionTimeOut, clientTimeOut, clientRetries, heartbeat, Config.<Boolean> getValue(ConfigValues.EncryptHostCommunication), Config.<String> getValue(ConfigValues.VdsmSSLProtocol), Config.<Integer> getValue(ConfigValues.EventProcessingPoolSize), Config.<String> getValue(ConfigValues.IrsRequestQueueName), - Config.<String> getValue(ConfigValues.IrsResponseQueueName))); + Config.<String> getValue(ConfigValues.IrsResponseQueueName), + eventQueue)); } else if (VdsProtocol.XML == vdsProtocol){ Pair<IrsServerConnector, HttpClient> returnValue = XmlRpcUtils.getConnection(hostname, port, clientTimeOut, connectionTimeOut, @@ -40,7 +47,7 @@ return irsServer; } - public static IVdsServer createVdsServer(VdsProtocol vdsProtocol, String hostname, int port, int clientTimeOut, + public static IVdsServer createVdsServer(VdsProtocol vdsProtocol, Version version, String hostname, int port, int clientTimeOut, int connectionTimeOut, int clientRetries, int heartbeat) { IVdsServer vdsServer = null; Pair<VdsServerConnector, HttpClient> returnValue = @@ -52,13 +59,18 @@ Config.<Boolean> getValue(ConfigValues.EncryptHostCommunication)); if (VdsProtocol.STOMP == vdsProtocol) { + String eventQueue = null; + if (FeatureSupported.vmStatsEvents(version)) { + eventQueue = Config.<String> getValue(ConfigValues.EventQueueName); + } vdsServer = new JsonRpcVdsServer(JsonRpcUtils.createStompClient(hostname, port, connectionTimeOut, clientTimeOut, clientRetries, heartbeat, Config.<Boolean> getValue(ConfigValues.EncryptHostCommunication), Config.<String> getValue(ConfigValues.VdsmSSLProtocol), Config.<Integer> getValue(ConfigValues.EventProcessingPoolSize), Config.<String> getValue(ConfigValues.VdsRequestQueueName), - Config.<String> getValue(ConfigValues.VdsResponseQueueName)), + Config.<String> getValue(ConfigValues.VdsResponseQueueName), + eventQueue), returnValue.getSecond()); } else if (VdsProtocol.XML == vdsProtocol) { vdsServer = new VdsServerWrapper(returnValue.getFirst(), returnValue.getSecond()); diff --git a/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/VdsManager.java b/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/VdsManager.java index 3e83164..1038029 100644 --- a/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/VdsManager.java +++ b/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/VdsManager.java @@ -200,6 +200,7 @@ int clientRetries = Config.<Integer> getValue(ConfigValues.vdsRetries); vdsProxy = TransportFactory.createVdsServer( cachedVds.getProtocol(), + cachedVds.getVdsGroupCompatibilityVersion(), cachedVds.getHostName(), cachedVds.getPort(), clientTimeOut, diff --git a/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/irsbroker/IrsProxyData.java b/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/irsbroker/IrsProxyData.java index 3551ab7..1dae7b6 100644 --- a/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/irsbroker/IrsProxyData.java +++ b/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/irsbroker/IrsProxyData.java @@ -60,6 +60,7 @@ import org.ovirt.engine.core.compat.KeyValuePairCompat; import org.ovirt.engine.core.compat.RefObject; import org.ovirt.engine.core.compat.TransactionScopeOption; +import org.ovirt.engine.core.compat.Version; import org.ovirt.engine.core.dal.dbbroker.DbFacade; import org.ovirt.engine.core.dal.dbbroker.auditloghandling.AuditLogDirector; import org.ovirt.engine.core.dal.dbbroker.auditloghandling.AuditLogableBase; @@ -136,6 +137,16 @@ private void setProtocol(VdsProtocol value) { this.privateProtocol = value; + } + + private Version privateVersion; + + private Version getVersion() { + return this.privateVersion; + } + + private void setVersion(Version version) { + this.privateVersion = version; } public Guid getFencedIrs() { @@ -537,6 +548,7 @@ setmIrsPort(vds.getPort()); privatemCurrentIrsHost = vds.getHostName(); setProtocol(vds.getProtocol()); + setVersion(vds.getVdsGroupCompatibilityVersion()); } public boolean failover() { @@ -598,7 +610,14 @@ int connectionTimeOut = Config.<Integer> getValue(ConfigValues.vdsConnectionTimeout) * 1000; int heartbeat = Config.<Integer> getValue(ConfigValues.vdsHeartbeatInSeconds) * 1000; int clientRetries = Config.<Integer> getValue(ConfigValues.vdsRetries); - irsProxy = TransportFactory.createIrsServer(getProtocol(), host, getmIrsPort(), clientTimeOut, connectionTimeOut, clientRetries, heartbeat); + irsProxy = TransportFactory.createIrsServer(getProtocol(), + getVersion(), + host, + getmIrsPort(), + clientTimeOut, + connectionTimeOut, + clientRetries, + heartbeat); runStoragePoolUpEvent(storagePool); } } diff --git a/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcUtils.java b/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcUtils.java index c125692..265d024 100644 --- a/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcUtils.java +++ b/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcUtils.java @@ -28,14 +28,17 @@ String protocol, int parallelism, String requestQueue, - String responseQueue) { - ClientPolicy connectionPolicy = + String responseQueue, + String eventQueue) { + StompClientPolicy connectionPolicy = new StompClientPolicy(connectionTimeout, connectionRetry, heartbeat, IOException.class, requestQueue, responseQueue); + connectionPolicy.setEventQueue(eventQueue); + ClientPolicy clientPolicy = new ClientPolicy(clientTimeout, connectionRetry, heartbeat, IOException.class); return createClient(hostname, port, connectionPolicy, clientPolicy, isSecure, ReactorType.STOMP, protocol, parallelism); } diff --git a/backend/manager/modules/vdsbroker/src/test/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcIntegrationTest.java b/backend/manager/modules/vdsbroker/src/test/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcIntegrationTest.java index 6f1c327e..914b3bc 100644 --- a/backend/manager/modules/vdsbroker/src/test/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcIntegrationTest.java +++ b/backend/manager/modules/vdsbroker/src/test/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcIntegrationTest.java @@ -26,6 +26,7 @@ private final static int TIMEOUT = 5000; private final static String DEFAULT_REQUEST_QUEUE = "jms.queue.requests"; private final static String DEFAULT_RESPONSE_QUEUE = "jms.queue.reponses"; + private final static String DEFAULT_EVENTS_QUEUE = "jms.queue.events"; @Test public void testGetVdsCapabilities() throws InterruptedException, ExecutionException, ClientConnectionException { @@ -37,7 +38,8 @@ "TLSv1", Runtime.getRuntime().availableProcessors(), DEFAULT_REQUEST_QUEUE, - DEFAULT_RESPONSE_QUEUE); + DEFAULT_RESPONSE_QUEUE, + DEFAULT_EVENTS_QUEUE); final JsonRpcRequest request = new RequestBuilder("Host.getCapabilities").build(); Map<String, Object> map = new FutureMap(client, request); assertTrue(map.isEmpty()); diff --git a/packaging/dbscripts/upgrade/pre_upgrade/0000_config.sql b/packaging/dbscripts/upgrade/pre_upgrade/0000_config.sql index 64fb059..0319f82 100644 --- a/packaging/dbscripts/upgrade/pre_upgrade/0000_config.sql +++ b/packaging/dbscripts/upgrade/pre_upgrade/0000_config.sql @@ -533,6 +533,7 @@ select fn_db_add_config_value('VdsResponseQueueName','jms.topic.vdsm_responses','general'); select fn_db_add_config_value('IrsRequestQueueName','jms.topic.vdsm_irs_requests','general'); select fn_db_add_config_value('IrsResponseQueueName','jms.topic.vdsm_irs_responses','general'); +select fn_db_add_config_value('EventQueueName','jms.queue.events','3.5'); select fn_db_add_config_value('EventProcessingPoolSize','10','general'); select fn_db_add_config_value('TimeToReduceFailedRunOnVdsInMinutes','30','general'); select fn_db_add_config_value('UnknownTaskPrePollingLapse','60000','general'); -- To view, visit https://gerrit.ovirt.org/41796 To unsubscribe, visit https://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I2ff65e274d03a8059130fbd9baca1320a6524aa3 Gerrit-PatchSet: 1 Gerrit-Project: ovirt-engine Gerrit-Branch: master Gerrit-Owner: Piotr Kliczewski <piotr.kliczew...@gmail.com> _______________________________________________ Engine-patches mailing list Engine-patches@ovirt.org http://lists.ovirt.org/mailman/listinfo/engine-patches