This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch pge in repository https://gitbox.apache.org/repos/asf/camel.git
commit 68caf42a0a88861d6ac988fb90cecf5bcd18df28 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Fri Jun 6 09:42:23 2025 +0200 CAMEL-22146: camel-pgevent - Re-connect in case of lost connection to database. --- .../apache/camel/catalog/components/pgevent.json | 12 +- .../pgevent/PgEventEndpointConfigurer.java | 24 ++++ .../pgevent/PgEventEndpointUriFactory.java | 6 +- .../apache/camel/component/pgevent/pgevent.json | 12 +- .../camel/component/pgevent/PgEventConsumer.java | 136 ++++++++++++++----- .../camel/component/pgevent/PgEventEndpoint.java | 59 +++++++++ .../apache/camel/pgevent/PgEventConsumerTest.java | 127 ------------------ .../apache/camel/pgevent/PgEventHelperTest.java | 65 ---------- .../apache/camel/pgevent/PgEventProducerTest.java | 144 --------------------- .../dsl/PgEventEndpointBuilderFactory.java | 130 +++++++++++++++++++ 10 files changed, 340 insertions(+), 375 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pgevent.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pgevent.json index 9abe4d1d5b2..1f11c8b900e 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pgevent.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pgevent.json @@ -39,9 +39,13 @@ "bridgeErrorHandler": { "index": 4, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming [...] "exceptionHandler": { "index": 5, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By def [...] "exchangePattern": { "index": 6, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, - "lazyStartProducer": { "index": 7, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produc [...] - "datasource": { "index": 8, "kind": "parameter", "displayName": "Datasource", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "javax.sql.DataSource", "deprecated": false, "autowired": false, "secret": false, "description": "To connect using the given javax.sql.DataSource instead of using hostname and port." }, - "pass": { "index": 9, "kind": "parameter", "displayName": "Pass", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "Password for login" }, - "user": { "index": 10, "kind": "parameter", "displayName": "User", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "defaultValue": "postgres", "description": "Username for login" } + "reconnectDelay": { "index": 7, "kind": "parameter", "displayName": "Reconnect Delay", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "description": "When the consumer unexpected lose connection to the database, then this specifies the interval (millis) between re-connection attempts to establish a new connection." }, + "workerPool": { "index": 8, "kind": "parameter", "displayName": "Worker Pool", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "java.util.concurrent.ExecutorService", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom worker pool for processing the events from the database." }, + "workerPoolCoreSize": { "index": 9, "kind": "parameter", "displayName": "Worker Pool Core Size", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "description": "Number of core threads in the worker pool for processing the events from the database." }, + "workerPoolMaxSize": { "index": 10, "kind": "parameter", "displayName": "Worker Pool Max Size", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "description": "Maximum number of threads in the worker pool for processing the events from the database." }, + "lazyStartProducer": { "index": 11, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] + "datasource": { "index": 12, "kind": "parameter", "displayName": "Datasource", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "javax.sql.DataSource", "deprecated": false, "autowired": false, "secret": false, "description": "To connect using the given javax.sql.DataSource instead of using hostname and port." }, + "pass": { "index": 13, "kind": "parameter", "displayName": "Pass", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "Password for login" }, + "user": { "index": 14, "kind": "parameter", "displayName": "User", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "defaultValue": "postgres", "description": "Username for login" } } } diff --git a/components/camel-pgevent/src/generated/java/org/apache/camel/component/pgevent/PgEventEndpointConfigurer.java b/components/camel-pgevent/src/generated/java/org/apache/camel/component/pgevent/PgEventEndpointConfigurer.java index 756e75bb17e..e453dfaf8d9 100644 --- a/components/camel-pgevent/src/generated/java/org/apache/camel/component/pgevent/PgEventEndpointConfigurer.java +++ b/components/camel-pgevent/src/generated/java/org/apache/camel/component/pgevent/PgEventEndpointConfigurer.java @@ -33,7 +33,15 @@ public class PgEventEndpointConfigurer extends PropertyConfigurerSupport impleme case "lazystartproducer": case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true; case "pass": target.setPass(property(camelContext, java.lang.String.class, value)); return true; + case "reconnectdelay": + case "reconnectDelay": target.setReconnectDelay(property(camelContext, int.class, value)); return true; case "user": target.setUser(property(camelContext, java.lang.String.class, value)); return true; + case "workerpool": + case "workerPool": target.setWorkerPool(property(camelContext, java.util.concurrent.ExecutorService.class, value)); return true; + case "workerpoolcoresize": + case "workerPoolCoreSize": target.setWorkerPoolCoreSize(property(camelContext, int.class, value)); return true; + case "workerpoolmaxsize": + case "workerPoolMaxSize": target.setWorkerPoolMaxSize(property(camelContext, int.class, value)); return true; default: return false; } } @@ -51,7 +59,15 @@ public class PgEventEndpointConfigurer extends PropertyConfigurerSupport impleme case "lazystartproducer": case "lazyStartProducer": return boolean.class; case "pass": return java.lang.String.class; + case "reconnectdelay": + case "reconnectDelay": return int.class; case "user": return java.lang.String.class; + case "workerpool": + case "workerPool": return java.util.concurrent.ExecutorService.class; + case "workerpoolcoresize": + case "workerPoolCoreSize": return int.class; + case "workerpoolmaxsize": + case "workerPoolMaxSize": return int.class; default: return null; } } @@ -70,7 +86,15 @@ public class PgEventEndpointConfigurer extends PropertyConfigurerSupport impleme case "lazystartproducer": case "lazyStartProducer": return target.isLazyStartProducer(); case "pass": return target.getPass(); + case "reconnectdelay": + case "reconnectDelay": return target.getReconnectDelay(); case "user": return target.getUser(); + case "workerpool": + case "workerPool": return target.getWorkerPool(); + case "workerpoolcoresize": + case "workerPoolCoreSize": return target.getWorkerPoolCoreSize(); + case "workerpoolmaxsize": + case "workerPoolMaxSize": return target.getWorkerPoolMaxSize(); default: return null; } } diff --git a/components/camel-pgevent/src/generated/java/org/apache/camel/component/pgevent/PgEventEndpointUriFactory.java b/components/camel-pgevent/src/generated/java/org/apache/camel/component/pgevent/PgEventEndpointUriFactory.java index dde682c9feb..b0d1353eef6 100644 --- a/components/camel-pgevent/src/generated/java/org/apache/camel/component/pgevent/PgEventEndpointUriFactory.java +++ b/components/camel-pgevent/src/generated/java/org/apache/camel/component/pgevent/PgEventEndpointUriFactory.java @@ -23,7 +23,7 @@ public class PgEventEndpointUriFactory extends org.apache.camel.support.componen private static final Set<String> SECRET_PROPERTY_NAMES; private static final Set<String> MULTI_VALUE_PREFIXES; static { - Set<String> props = new HashSet<>(11); + Set<String> props = new HashSet<>(15); props.add("bridgeErrorHandler"); props.add("channel"); props.add("database"); @@ -34,7 +34,11 @@ public class PgEventEndpointUriFactory extends org.apache.camel.support.componen props.add("lazyStartProducer"); props.add("pass"); props.add("port"); + props.add("reconnectDelay"); props.add("user"); + props.add("workerPool"); + props.add("workerPoolCoreSize"); + props.add("workerPoolMaxSize"); PROPERTY_NAMES = Collections.unmodifiableSet(props); Set<String> secretProps = new HashSet<>(2); secretProps.add("pass"); diff --git a/components/camel-pgevent/src/generated/resources/META-INF/org/apache/camel/component/pgevent/pgevent.json b/components/camel-pgevent/src/generated/resources/META-INF/org/apache/camel/component/pgevent/pgevent.json index 9abe4d1d5b2..1f11c8b900e 100644 --- a/components/camel-pgevent/src/generated/resources/META-INF/org/apache/camel/component/pgevent/pgevent.json +++ b/components/camel-pgevent/src/generated/resources/META-INF/org/apache/camel/component/pgevent/pgevent.json @@ -39,9 +39,13 @@ "bridgeErrorHandler": { "index": 4, "kind": "parameter", "displayName": "Bridge Error Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions (if possible) occurred while the Camel consumer is trying to pickup incoming [...] "exceptionHandler": { "index": 5, "kind": "parameter", "displayName": "Exception Handler", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", "deprecated": false, "autowired": false, "secret": false, "description": "To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By def [...] "exchangePattern": { "index": 6, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." }, - "lazyStartProducer": { "index": 7, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produc [...] - "datasource": { "index": 8, "kind": "parameter", "displayName": "Datasource", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "javax.sql.DataSource", "deprecated": false, "autowired": false, "secret": false, "description": "To connect using the given javax.sql.DataSource instead of using hostname and port." }, - "pass": { "index": 9, "kind": "parameter", "displayName": "Pass", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "Password for login" }, - "user": { "index": 10, "kind": "parameter", "displayName": "User", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "defaultValue": "postgres", "description": "Username for login" } + "reconnectDelay": { "index": 7, "kind": "parameter", "displayName": "Reconnect Delay", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, "description": "When the consumer unexpected lose connection to the database, then this specifies the interval (millis) between re-connection attempts to establish a new connection." }, + "workerPool": { "index": 8, "kind": "parameter", "displayName": "Worker Pool", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "object", "javaType": "java.util.concurrent.ExecutorService", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom worker pool for processing the events from the database." }, + "workerPoolCoreSize": { "index": 9, "kind": "parameter", "displayName": "Worker Pool Core Size", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, "description": "Number of core threads in the worker pool for processing the events from the database." }, + "workerPoolMaxSize": { "index": 10, "kind": "parameter", "displayName": "Worker Pool Max Size", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, "description": "Maximum number of threads in the worker pool for processing the events from the database." }, + "lazyStartProducer": { "index": 11, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] + "datasource": { "index": 12, "kind": "parameter", "displayName": "Datasource", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "javax.sql.DataSource", "deprecated": false, "autowired": false, "secret": false, "description": "To connect using the given javax.sql.DataSource instead of using hostname and port." }, + "pass": { "index": 13, "kind": "parameter", "displayName": "Pass", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "description": "Password for login" }, + "user": { "index": 14, "kind": "parameter", "displayName": "User", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "defaultValue": "postgres", "description": "Username for login" } } } diff --git a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java index e0ce6dfe92e..2b0509fb0b4 100644 --- a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java +++ b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java @@ -17,6 +17,8 @@ package org.apache.camel.component.pgevent; import java.sql.PreparedStatement; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import com.impossibl.postgres.api.jdbc.PGConnection; import com.impossibl.postgres.api.jdbc.PGNotificationListener; @@ -24,68 +26,142 @@ import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.support.DefaultConsumer; +import org.apache.camel.util.backoff.BackOff; +import org.apache.camel.util.backoff.BackOffTimer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * The PgEvent consumer. */ -public class PgEventConsumer extends DefaultConsumer implements PGNotificationListener { +public class PgEventConsumer extends DefaultConsumer { private static final Logger LOG = LoggerFactory.getLogger(PgEventConsumer.class); + private final PgEventListener listener = new PgEventListener(); private final PgEventEndpoint endpoint; private PGConnection dbConnection; + private ScheduledExecutorService reconnectPool; + private ExecutorService workerPool; + private boolean shutdownWorkerPool; + private BackOffTimer timer; public PgEventConsumer(PgEventEndpoint endpoint, Processor processor) { super(endpoint, processor); this.endpoint = endpoint; } + public PgEventListener getPgEventListener() { + return listener; + } + @Override protected void doStart() throws Exception { super.doStart(); - dbConnection = endpoint.initJdbc(); - String sql = String.format("LISTEN %s", endpoint.getChannel()); - try (PreparedStatement statement = dbConnection.prepareStatement(sql)) { - statement.execute(); + if (endpoint.getWorkerPool() != null) { + workerPool = endpoint.getWorkerPool(); + } else { + workerPool = endpoint.createWorkerPool(); + shutdownWorkerPool = true; } - dbConnection.addNotificationListener(endpoint.getChannel(), endpoint.getChannel(), this); + // used for re-connecting to the database + reconnectPool = getEndpoint().getCamelContext().getExecutorServiceManager() + .newSingleThreadScheduledExecutor(this, "Reconnector"); + timer = new BackOffTimer(reconnectPool); + listener.initConnection(); } @Override - public void notification(int processId, String channel, String payload) { - if (LOG.isDebugEnabled()) { - LOG.debug("Notification processId: {}, channel: {}, payload: {}", processId, channel, payload); + protected void doStop() throws Exception { + super.doStop(); + listener.closeConnection(); + getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(reconnectPool); + timer = null; + if (shutdownWorkerPool && workerPool != null) { + LOG.debug("Shutting down PgEventConsumer worker threads with timeout {} millis", 10000); + endpoint.getCamelContext().getExecutorServiceManager().shutdownGraceful(workerPool, 10000); + workerPool = null; } + } - Exchange exchange = createExchange(false); - Message msg = exchange.getIn(); - msg.setHeader(PgEventConstants.HEADER_CHANNEL, channel); - msg.setBody(payload); + public class PgEventListener implements PGNotificationListener { - try { - getProcessor().process(exchange); - } catch (Exception e) { - exchange.setException(e); - } - if (exchange.getException() != null) { - String cause = "Unable to process incoming notification from PostgreSQL: processId='" + processId + "', channel='" - + channel + "', payload='" + payload + "'"; - getExceptionHandler().handleException(cause, exchange.getException()); + public void reconnect() { + BackOff bo = BackOff.builder().delay(endpoint.getReconnectDelay()).build(); + timer.schedule(bo, t -> { + LOG.debug("Connecting attempt #" + t.getCurrentAttempts()); + try { + initConnection(); + } catch (Exception e) { + return true; + } + LOG.debug("Connecting successful"); + return false; + }); } - releaseExchange(exchange, false); - } - @Override - protected void doStop() throws Exception { - if (dbConnection != null) { - dbConnection.removeNotificationListener(endpoint.getChannel()); - String sql = String.format("UNLISTEN %s", endpoint.getChannel()); + public void initConnection() throws Exception { + dbConnection = endpoint.initJdbc(); + String sql = String.format("LISTEN %s", endpoint.getChannel()); try (PreparedStatement statement = dbConnection.prepareStatement(sql)) { statement.execute(); } - dbConnection.close(); + dbConnection.addNotificationListener(endpoint.getChannel(), endpoint.getChannel(), listener); + } + + public void closeConnection() throws Exception { + if (dbConnection != null) { + try { + dbConnection.removeNotificationListener(endpoint.getChannel()); + String sql = String.format("UNLISTEN %s", endpoint.getChannel()); + try (PreparedStatement statement = dbConnection.prepareStatement(sql)) { + statement.execute(); + } + dbConnection.close(); + } catch (Exception e) { + // ignore + } + } + dbConnection = null; + } + + @Override + public void notification(int processId, String channel, String payload) { + if (LOG.isDebugEnabled()) { + LOG.debug("Notification processId: {}, channel: {}, payload: {}", processId, channel, payload); + } + + Exchange exchange = createExchange(false); + Message msg = exchange.getIn(); + msg.setHeader(PgEventConstants.HEADER_CHANNEL, channel); + msg.setBody(payload); + + // use worker pool to avoid blocking notification thread + if (workerPool != null) { + workerPool.submit(() -> { + try { + getProcessor().process(exchange); + } catch (Exception e) { + exchange.setException(e); + } + if (exchange.getException() != null) { + String cause + = "Unable to process incoming notification from PostgreSQL: processId='" + processId + + "', channel='" + + channel + "', payload='" + payload + "'"; + getExceptionHandler().handleException(cause, exchange.getException()); + } + releaseExchange(exchange, false); + }); + } + } + + @Override + public void closed() { + // connection lost, so we need to re-connect + LOG.warn("Connection to PostgreSQL lost unexpected. Re-connecting..."); + reconnect(); } } + } diff --git a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventEndpoint.java b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventEndpoint.java index ca506ffef4b..8e56d15a6b8 100644 --- a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventEndpoint.java +++ b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventEndpoint.java @@ -18,6 +18,7 @@ package org.apache.camel.component.pgevent; import java.sql.DriverManager; import java.util.Map; +import java.util.concurrent.ExecutorService; import javax.sql.DataSource; @@ -70,6 +71,14 @@ public class PgEventEndpoint extends DefaultEndpoint implements EndpointServiceL private String pass; @UriParam(label = "advanced") private DataSource datasource; + @UriParam(label = "consumer,advanced", defaultValue = "5000") + private int reconnectDelay = 5000; + @UriParam(label = "consumer,advanced") + private ExecutorService workerPool; + @UriParam(label = "consumer,advanced", defaultValue = "1") + private int workerPoolCoreSize = 1; + @UriParam(label = "consumer,advanced", defaultValue = "10") + private int workerPoolMaxSize = 10; private final String uri; @@ -186,6 +195,11 @@ public class PgEventEndpoint extends DefaultEndpoint implements EndpointServiceL return consumer; } + ExecutorService createWorkerPool() { + return getCamelContext().getExecutorServiceManager().newThreadPool(this, + "PgEventConsumer[" + channel + "]", workerPoolCoreSize, workerPoolMaxSize); + } + public String getHost() { return host; } @@ -263,4 +277,49 @@ public class PgEventEndpoint extends DefaultEndpoint implements EndpointServiceL public void setDatasource(DataSource datasource) { this.datasource = datasource; } + + public int getReconnectDelay() { + return reconnectDelay; + } + + /** + * When the consumer unexpected lose connection to the database, then this specifies the interval (millis) between + * re-connection attempts to establish a new connection. + */ + public void setReconnectDelay(int reconnectDelay) { + this.reconnectDelay = reconnectDelay; + } + + public ExecutorService getWorkerPool() { + return workerPool; + } + + /** + * To use a custom worker pool for processing the events from the database. + */ + public void setWorkerPool(ExecutorService workerPool) { + this.workerPool = workerPool; + } + + public int getWorkerPoolCoreSize() { + return workerPoolCoreSize; + } + + /** + * Number of core threads in the worker pool for processing the events from the database. + */ + public void setWorkerPoolCoreSize(int workerPoolCoreSize) { + this.workerPoolCoreSize = workerPoolCoreSize; + } + + public int getWorkerPoolMaxSize() { + return workerPoolMaxSize; + } + + /** + * Maximum number of threads in the worker pool for processing the events from the database. + */ + public void setWorkerPoolMaxSize(int workerPoolMaxSize) { + this.workerPoolMaxSize = workerPoolMaxSize; + } } diff --git a/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventConsumerTest.java b/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventConsumerTest.java deleted file mode 100644 index 6fad950412e..00000000000 --- a/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventConsumerTest.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.pgevent; - -import java.sql.PreparedStatement; - -import com.impossibl.postgres.api.jdbc.PGConnection; -import com.impossibl.postgres.jdbc.PGDataSource; -import org.apache.camel.CamelContext; -import org.apache.camel.Exchange; -import org.apache.camel.ExchangeExtension; -import org.apache.camel.ExtendedCamelContext; -import org.apache.camel.Message; -import org.apache.camel.Processor; -import org.apache.camel.component.pgevent.PgEventConsumer; -import org.apache.camel.component.pgevent.PgEventEndpoint; -import org.apache.camel.spi.ExchangeFactory; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.junit.jupiter.MockitoExtension; - -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -@ExtendWith(MockitoExtension.class) -public class PgEventConsumerTest { - - @Test - public void testPgEventConsumerStart() throws Exception { - PGDataSource dataSource = mock(PGDataSource.class); - PGConnection connection = mock(PGConnection.class); - PreparedStatement statement = mock(PreparedStatement.class); - PgEventEndpoint endpoint = mock(PgEventEndpoint.class); - Processor processor = mock(Processor.class); - CamelContext camelContext = mock(CamelContext.class); - ExtendedCamelContext ecc = mock(ExtendedCamelContext.class); - ExchangeFactory ef = mock(ExchangeFactory.class); - - when(endpoint.getCamelContext()).thenReturn(camelContext); - when(camelContext.getCamelContextExtension()).thenReturn(ecc); - when(ecc.getExchangeFactory()).thenReturn(ef); - when(ef.newExchangeFactory(any())).thenReturn(ef); - when(endpoint.initJdbc()).thenReturn(connection); - when(connection.prepareStatement("LISTEN camel")).thenReturn(statement); - when(endpoint.getChannel()).thenReturn("camel"); - - PgEventConsumer consumer = new PgEventConsumer(endpoint, processor); - consumer.start(); - - verify(connection).addNotificationListener("camel", "camel", consumer); - assertTrue(consumer.isStarted()); - } - - @Test - public void testPgEventConsumerStop() throws Exception { - PGDataSource dataSource = mock(PGDataSource.class); - PGConnection connection = mock(PGConnection.class); - PreparedStatement statement = mock(PreparedStatement.class); - PgEventEndpoint endpoint = mock(PgEventEndpoint.class); - Processor processor = mock(Processor.class); - CamelContext camelContext = mock(CamelContext.class); - ExtendedCamelContext ecc = mock(ExtendedCamelContext.class); - ExchangeFactory ef = mock(ExchangeFactory.class); - - when(endpoint.getCamelContext()).thenReturn(camelContext); - when(camelContext.getCamelContextExtension()).thenReturn(ecc); - when(ecc.getExchangeFactory()).thenReturn(ef); - when(ef.newExchangeFactory(any())).thenReturn(ef); - when(endpoint.initJdbc()).thenReturn(connection); - when(connection.prepareStatement("LISTEN camel")).thenReturn(statement); - when(connection.prepareStatement("LISTEN camel")).thenReturn(statement); - when(endpoint.getChannel()).thenReturn("camel"); - when(connection.prepareStatement("UNLISTEN camel")).thenReturn(statement); - - PgEventConsumer consumer = new PgEventConsumer(endpoint, processor); - consumer.start(); - consumer.stop(); - - verify(connection).removeNotificationListener("camel"); - verify(connection).close(); - assertTrue(consumer.isStopped()); - } - - @Test - public void testPgEventNotification() throws Exception { - PgEventEndpoint endpoint = mock(PgEventEndpoint.class); - Processor processor = mock(Processor.class); - Exchange exchange = mock(Exchange.class); - ExchangeExtension exchangeExtension = mock(ExchangeExtension.class); - Message message = mock(Message.class); - CamelContext camelContext = mock(CamelContext.class); - ExtendedCamelContext ecc = mock(ExtendedCamelContext.class); - ExchangeFactory ef = mock(ExchangeFactory.class); - - when(endpoint.getCamelContext()).thenReturn(camelContext); - when(camelContext.getCamelContextExtension()).thenReturn(ecc); - when(ecc.getExchangeFactory()).thenReturn(ef); - when(ef.newExchangeFactory(any())).thenReturn(ef); - when(ef.create(endpoint, false)).thenReturn(exchange); - when(exchange.getExchangeExtension()).thenReturn(exchangeExtension); - when(exchange.getIn()).thenReturn(message); - - PgEventConsumer consumer = new PgEventConsumer(endpoint, processor); - consumer.notification(1, "camel", "some event"); - - verify(message).setHeader("channel", "camel"); - verify(message).setBody("some event"); - verify(processor).process(exchange); - } -} diff --git a/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventHelperTest.java b/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventHelperTest.java deleted file mode 100644 index 3e1df152970..00000000000 --- a/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventHelperTest.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.pgevent; - -import java.sql.Connection; - -import com.impossibl.postgres.api.jdbc.PGConnection; -import org.apache.camel.component.pgevent.PgEventHelper; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.junit.jupiter.MockitoExtension; - -import static org.junit.jupiter.api.Assertions.assertSame; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -@ExtendWith(MockitoExtension.class) -public class PgEventHelperTest { - - @Test - public void testToPGConnectionWithNullConnection() { - assertThrows(IllegalArgumentException.class, - () -> PgEventHelper.toPGConnection(null)); - } - - @Test - public void testToPGConnectionWithNonWrappedConnection() throws Exception { - Connection originalConnection = mock(PGConnection.class); - PGConnection actualConnection = PgEventHelper.toPGConnection(originalConnection); - assertSame(originalConnection, actualConnection); - } - - @Test - public void testToPGConnectionWithWrappedConnection() throws Exception { - Connection wrapperConnection = mock(Connection.class); - PGConnection unwrappedConnection = mock(PGConnection.class); - when(wrapperConnection.isWrapperFor(PGConnection.class)).thenReturn(true); - when(wrapperConnection.unwrap(PGConnection.class)).thenReturn(unwrappedConnection); - PGConnection actualConnection = PgEventHelper.toPGConnection(wrapperConnection); - assertSame(unwrappedConnection, actualConnection); - } - - @Test - public void testToPGConnectionWithInvalidWrappedConnection() throws Exception { - Connection wrapperConnection = mock(Connection.class); - when(wrapperConnection.isWrapperFor(PGConnection.class)).thenReturn(false); - assertThrows(IllegalStateException.class, - () -> PgEventHelper.toPGConnection(wrapperConnection)); - } -} diff --git a/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventProducerTest.java b/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventProducerTest.java deleted file mode 100644 index d5ad387f15a..00000000000 --- a/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventProducerTest.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.pgevent; - -import java.sql.CallableStatement; -import java.sql.PreparedStatement; -import java.sql.SQLException; - -import com.impossibl.postgres.api.jdbc.PGConnection; -import com.impossibl.postgres.jdbc.PGDataSource; -import org.apache.camel.Exchange; -import org.apache.camel.Message; -import org.apache.camel.component.pgevent.InvalidStateException; -import org.apache.camel.component.pgevent.PgEventEndpoint; -import org.apache.camel.component.pgevent.PgEventProducer; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentMatchers; -import org.mockito.junit.jupiter.MockitoExtension; - -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -@ExtendWith(MockitoExtension.class) -public class PgEventProducerTest { - - private PgEventEndpoint endpoint = mock(PgEventEndpoint.class); - private PGDataSource dataSource = mock(PGDataSource.class); - private PGConnection connection = mock(PGConnection.class); - private PreparedStatement statement = mock(PreparedStatement.class); - private Exchange exchange = mock(Exchange.class); - private Message message = mock(Message.class); - - @Test - public void testPgEventProducerStart() throws Exception { - when(endpoint.getDatasource()).thenReturn(dataSource); - when(dataSource.getConnection()).thenReturn(connection); - - PgEventProducer producer = new PgEventProducer(endpoint); - producer.start(); - - assertTrue(producer.isStarted()); - } - - @Test - public void testPgEventProducerStop() throws Exception { - when(endpoint.initJdbc()).thenReturn(connection); - - PgEventProducer producer = new PgEventProducer(endpoint); - producer.start(); - producer.stop(); - - verify(connection).close(); - assertTrue(producer.isStopped()); - } - - @Test - public void testPgEventProducerProcessDbThrowsInvalidStateException() throws Exception { - when(endpoint.initJdbc()).thenReturn(connection); - when(connection.isClosed()).thenThrow(new SQLException("DB problem occurred")); - - PgEventProducer producer = new PgEventProducer(endpoint); - producer.start(); - assertThrows(InvalidStateException.class, - () -> producer.process(exchange)); - } - - @Test - public void testPgEventProducerProcessDbConnectionClosed() throws Exception { - PGConnection connectionNew = mock(PGConnection.class); - - when(endpoint.initJdbc()).thenReturn(connection); - when(endpoint.getDatasource()).thenReturn(dataSource); - when(dataSource.getConnection()).thenReturn(connection, connectionNew); - when(connection.isClosed()).thenReturn(true); - when(exchange.getIn()).thenReturn(message); - when(message.getBody(String.class)).thenReturn("pgevent"); - when(endpoint.getChannel()).thenReturn("camel"); - when(connection.prepareStatement(ArgumentMatchers.anyString())).thenReturn(statement); - - PgEventProducer producer = new PgEventProducer(endpoint); - producer.start(); - producer.process(exchange); - - verify(statement).execute(); - } - - @Test - public void testPgEventProducerProcessServerMinimumVersionMatched() throws Exception { - CallableStatement statement = mock(CallableStatement.class); - - when(endpoint.initJdbc()).thenReturn(connection); - when(endpoint.getDatasource()).thenReturn(dataSource); - when(connection.isClosed()).thenReturn(false); - when(dataSource.getConnection()).thenReturn(connection); - when(exchange.getIn()).thenReturn(message); - when(message.getBody(String.class)).thenReturn("pgevent"); - when(endpoint.getChannel()).thenReturn("camel"); - when(connection.isServerMinimumVersion(9, 0)).thenReturn(true); - when(connection.prepareCall(ArgumentMatchers.anyString())).thenReturn(statement); - - PgEventProducer producer = new PgEventProducer(endpoint); - producer.start(); - producer.process(exchange); - - verify(statement).execute(); - } - - @Test - public void testPgEventProducerProcessServerMinimumVersionNotMatched() throws Exception { - when(endpoint.initJdbc()).thenReturn(connection); - when(endpoint.getDatasource()).thenReturn(dataSource); - when(connection.isClosed()).thenReturn(false); - when(dataSource.getConnection()).thenReturn(connection); - when(exchange.getIn()).thenReturn(message); - when(message.getBody(String.class)).thenReturn("pgevent"); - when(endpoint.getChannel()).thenReturn("camel"); - when(connection.isServerMinimumVersion(9, 0)).thenReturn(false); - when(connection.prepareStatement("NOTIFY camel, 'pgevent'")).thenReturn(statement); - - PgEventProducer producer = new PgEventProducer(endpoint); - producer.start(); - producer.process(exchange); - - verify(statement).execute(); - } -} diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PgEventEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PgEventEndpointBuilderFactory.java index 878f1ea7127..2fb4a8eb84e 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PgEventEndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PgEventEndpointBuilderFactory.java @@ -200,6 +200,136 @@ public interface PgEventEndpointBuilderFactory { doSetProperty("exchangePattern", exchangePattern); return this; } + /** + * When the consumer unexpected lose connection to the database, then + * this specifies the interval (millis) between re-connection attempts + * to establish a new connection. + * + * The option is a: <code>int</code> type. + * + * Default: 5000 + * Group: consumer (advanced) + * + * @param reconnectDelay the value to set + * @return the dsl builder + */ + default AdvancedPgEventEndpointConsumerBuilder reconnectDelay(int reconnectDelay) { + doSetProperty("reconnectDelay", reconnectDelay); + return this; + } + /** + * When the consumer unexpected lose connection to the database, then + * this specifies the interval (millis) between re-connection attempts + * to establish a new connection. + * + * The option will be converted to a <code>int</code> type. + * + * Default: 5000 + * Group: consumer (advanced) + * + * @param reconnectDelay the value to set + * @return the dsl builder + */ + default AdvancedPgEventEndpointConsumerBuilder reconnectDelay(String reconnectDelay) { + doSetProperty("reconnectDelay", reconnectDelay); + return this; + } + /** + * To use a custom worker pool for processing the events from the + * database. + * + * The option is a: <code>java.util.concurrent.ExecutorService</code> + * type. + * + * Group: consumer (advanced) + * + * @param workerPool the value to set + * @return the dsl builder + */ + default AdvancedPgEventEndpointConsumerBuilder workerPool(ExecutorService workerPool) { + doSetProperty("workerPool", workerPool); + return this; + } + /** + * To use a custom worker pool for processing the events from the + * database. + * + * The option will be converted to a + * <code>java.util.concurrent.ExecutorService</code> type. + * + * Group: consumer (advanced) + * + * @param workerPool the value to set + * @return the dsl builder + */ + default AdvancedPgEventEndpointConsumerBuilder workerPool(String workerPool) { + doSetProperty("workerPool", workerPool); + return this; + } + /** + * Number of core threads in the worker pool for processing the events + * from the database. + * + * The option is a: <code>int</code> type. + * + * Default: 1 + * Group: consumer (advanced) + * + * @param workerPoolCoreSize the value to set + * @return the dsl builder + */ + default AdvancedPgEventEndpointConsumerBuilder workerPoolCoreSize(int workerPoolCoreSize) { + doSetProperty("workerPoolCoreSize", workerPoolCoreSize); + return this; + } + /** + * Number of core threads in the worker pool for processing the events + * from the database. + * + * The option will be converted to a <code>int</code> type. + * + * Default: 1 + * Group: consumer (advanced) + * + * @param workerPoolCoreSize the value to set + * @return the dsl builder + */ + default AdvancedPgEventEndpointConsumerBuilder workerPoolCoreSize(String workerPoolCoreSize) { + doSetProperty("workerPoolCoreSize", workerPoolCoreSize); + return this; + } + /** + * Maximum number of threads in the worker pool for processing the + * events from the database. + * + * The option is a: <code>int</code> type. + * + * Default: 10 + * Group: consumer (advanced) + * + * @param workerPoolMaxSize the value to set + * @return the dsl builder + */ + default AdvancedPgEventEndpointConsumerBuilder workerPoolMaxSize(int workerPoolMaxSize) { + doSetProperty("workerPoolMaxSize", workerPoolMaxSize); + return this; + } + /** + * Maximum number of threads in the worker pool for processing the + * events from the database. + * + * The option will be converted to a <code>int</code> type. + * + * Default: 10 + * Group: consumer (advanced) + * + * @param workerPoolMaxSize the value to set + * @return the dsl builder + */ + default AdvancedPgEventEndpointConsumerBuilder workerPoolMaxSize(String workerPoolMaxSize) { + doSetProperty("workerPoolMaxSize", workerPoolMaxSize); + return this; + } /** * To connect using the given javax.sql.DataSource instead of using * hostname and port.