Piotr Kliczewski has uploaded a new change for review. Change subject: events: make event processing pool size configurable ......................................................................
events: make event processing pool size configurable Change-Id: Ib4e71d1767180bb22fc8129ccec9ed967cf4a8ad Signed-off-by: pkliczewski <piotr.kliczew...@gmail.com> --- M backend/manager/modules/common/src/main/java/org/ovirt/engine/core/common/config/ConfigValues.java M backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/ResourceManager.java M backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/TransportFactory.java M backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcUtils.java M packaging/dbscripts/upgrade/pre_upgrade/0000_config.sql 5 files changed, 20 insertions(+), 8 deletions(-) git pull ssh://gerrit.ovirt.org:29418/ovirt-engine refs/changes/93/39093/1 diff --git a/backend/manager/modules/common/src/main/java/org/ovirt/engine/core/common/config/ConfigValues.java b/backend/manager/modules/common/src/main/java/org/ovirt/engine/core/common/config/ConfigValues.java index 892f47b..30dd9df 100644 --- a/backend/manager/modules/common/src/main/java/org/ovirt/engine/core/common/config/ConfigValues.java +++ b/backend/manager/modules/common/src/main/java/org/ovirt/engine/core/common/config/ConfigValues.java @@ -404,6 +404,9 @@ @TypeConverterAttribute(String.class) @DefaultValueAttribute("jms.queue.irsreponses") IrsResponseQueueName, + @TypeConverterAttribute(Integer.class) + @DefaultValueAttribute("10") + EventProcessingPoolSize, @Reloadable @TypeConverterAttribute(String.class) @DefaultValueAttribute("oVirt") diff --git a/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/ResourceManager.java b/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/ResourceManager.java index 0bedfe0..f5775ea 100644 --- a/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/ResourceManager.java +++ b/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/ResourceManager.java @@ -34,6 +34,8 @@ import org.ovirt.engine.core.common.businessentities.network.NetworkStatistics; import org.ovirt.engine.core.common.businessentities.network.VmNetworkInterface; import org.ovirt.engine.core.common.businessentities.network.VmNetworkStatistics; +import org.ovirt.engine.core.common.config.Config; +import org.ovirt.engine.core.common.config.ConfigValues; import org.ovirt.engine.core.common.interfaces.FutureVDSCall; import org.ovirt.engine.core.common.qualifiers.VmDeleted; import org.ovirt.engine.core.common.vdscommands.FutureVDSCommandType; @@ -72,6 +74,7 @@ private static final String VDSCommandPrefix = "VDSCommand"; private static final Logger log = LoggerFactory.getLogger(ResourceManager.class); + private static final int PARALLELISM = Config.<Integer> getValue(ConfigValues.EventProcessingPoolSize); @Inject private AuditLogDirector auditLogDirector; @@ -497,6 +500,6 @@ } public void subscribe(EventSubscriber subscriber) { - ReactorFactory.getWorker().getPublisher().subscribe(subscriber); + ReactorFactory.getWorker(PARALLELISM).getPublisher().subscribe(subscriber); } } diff --git a/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/TransportFactory.java b/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/TransportFactory.java index f496f33..f7dac73 100644 --- a/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/TransportFactory.java +++ b/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/TransportFactory.java @@ -25,6 +25,7 @@ port, connectionTimeOut, clientTimeOut, clientRetries, heartbeat, Config.<Boolean> getValue(ConfigValues.EncryptHostCommunication), Config.<String> getValue(ConfigValues.VdsmSSLProtocol), + Config.<Integer> getValue(ConfigValues.EventProcessingPoolSize), Config.<String> getValue(ConfigValues.IrsRequestQueueName), Config.<String> getValue(ConfigValues.IrsResponseQueueName))); } else if (VdsProtocol.XML == vdsProtocol){ @@ -55,6 +56,7 @@ port, connectionTimeOut, clientTimeOut, clientRetries, heartbeat, Config.<Boolean> getValue(ConfigValues.EncryptHostCommunication), Config.<String> getValue(ConfigValues.VdsmSSLProtocol), + Config.<Integer> getValue(ConfigValues.EventProcessingPoolSize), Config.<String> getValue(ConfigValues.VdsRequestQueueName), Config.<String> getValue(ConfigValues.VdsResponseQueueName)), returnValue.getSecond()); diff --git a/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcUtils.java b/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcUtils.java index 57a7cec..fc1bf33 100644 --- a/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcUtils.java +++ b/backend/manager/modules/vdsbroker/src/main/java/org/ovirt/engine/core/vdsbroker/jsonrpc/JsonRpcUtils.java @@ -26,6 +26,7 @@ int heartbeat, boolean isSecure, String protocol, + int parallelism, String requestQueue, String responseQueue) { ClientPolicy connectionPolicy = @@ -36,7 +37,7 @@ requestQueue, responseQueue); ClientPolicy clientPolicy = new ClientPolicy(clientTimeout, connectionRetry, heartbeat, IOException.class); - return createClient(hostname, port, connectionPolicy, clientPolicy, isSecure, ReactorType.STOMP, protocol); + return createClient(hostname, port, connectionPolicy, clientPolicy, isSecure, ReactorType.STOMP, protocol, parallelism); } public static JsonRpcClient createClient(String hostname, @@ -47,11 +48,12 @@ int heartbeat, boolean isSecure, ReactorType type, - String protocol) { + String protocol, + int parallelism) { ClientPolicy connectionPolicy = new ClientPolicy(connectionTimeout, connectionRetry, heartbeat, IOException.class); ClientPolicy clientPolicy = new ClientPolicy(clientTimeout, connectionRetry, heartbeat, IOException.class); - return createClient(hostname, port, connectionPolicy, clientPolicy, isSecure, type, protocol); + return createClient(hostname, port, connectionPolicy, clientPolicy, isSecure, type, protocol, parallelism); } private static JsonRpcClient createClient(String hostname, @@ -60,14 +62,15 @@ ClientPolicy clientPolicy, boolean isSecure, ReactorType type, - String protocol) { + String protocol, + int parallelism) { ManagerProvider provider = null; if (isSecure) { provider = new EngineManagerProvider(protocol); } try { final Reactor reactor = ReactorFactory.getReactor(provider, type); - return getJsonClient(reactor, hostname, port, connectionPolicy, clientPolicy); + return getJsonClient(reactor, hostname, port, connectionPolicy, clientPolicy, parallelism); } catch (ClientConnectionException e) { log.error("Exception occured during building ssl context or obtaining selector", e); throw new IllegalStateException(e); @@ -75,10 +78,10 @@ } private static JsonRpcClient getJsonClient(Reactor reactor, String hostName, int port, ClientPolicy - connectionPolicy, ClientPolicy clientPolicy) throws ClientConnectionException { + connectionPolicy, ClientPolicy clientPolicy, int parallelism) throws ClientConnectionException { final ReactorClient client = reactor.createClient(hostName, port); client.setClientPolicy(connectionPolicy); - ResponseWorker worker = ReactorFactory.getWorker(); + ResponseWorker worker = ReactorFactory.getWorker(parallelism); JsonRpcClient jsonClient = worker.register(client); jsonClient.setRetryPolicy(clientPolicy); return jsonClient; diff --git a/packaging/dbscripts/upgrade/pre_upgrade/0000_config.sql b/packaging/dbscripts/upgrade/pre_upgrade/0000_config.sql index 85e8d62..96876e4 100644 --- a/packaging/dbscripts/upgrade/pre_upgrade/0000_config.sql +++ b/packaging/dbscripts/upgrade/pre_upgrade/0000_config.sql @@ -500,6 +500,7 @@ select fn_db_add_config_value('VdsResponseQueueName','jms.topic.vdsm_responses','general'); select fn_db_add_config_value('IrsRequestQueueName','jms.topic.vdsm_irs_requests','general'); select fn_db_add_config_value('IrsResponseQueueName','jms.topic.vdsm_irs_responses','general'); +select fn_db_add_config_value('EventProcessingPoolSize','10','general'); select fn_db_add_config_value('TimeToReduceFailedRunOnVdsInMinutes','30','general'); select fn_db_add_config_value('UnknownTaskPrePollingLapse','60000','general'); select fn_db_add_config_value('UserSessionHardLimit','600','general'); -- To view, visit https://gerrit.ovirt.org/39093 To unsubscribe, visit https://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ib4e71d1767180bb22fc8129ccec9ed967cf4a8ad Gerrit-PatchSet: 1 Gerrit-Project: ovirt-engine Gerrit-Branch: master Gerrit-Owner: Piotr Kliczewski <piotr.kliczew...@gmail.com> _______________________________________________ Engine-patches mailing list Engine-patches@ovirt.org http://lists.ovirt.org/mailman/listinfo/engine-patches