This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch pge
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 68caf42a0a88861d6ac988fb90cecf5bcd18df28
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Fri Jun 6 09:42:23 2025 +0200

    CAMEL-22146: camel-pgevent - Re-connect in case of lost connection to 
database.
---
 .../apache/camel/catalog/components/pgevent.json   |  12 +-
 .../pgevent/PgEventEndpointConfigurer.java         |  24 ++++
 .../pgevent/PgEventEndpointUriFactory.java         |   6 +-
 .../apache/camel/component/pgevent/pgevent.json    |  12 +-
 .../camel/component/pgevent/PgEventConsumer.java   | 136 ++++++++++++++-----
 .../camel/component/pgevent/PgEventEndpoint.java   |  59 +++++++++
 .../apache/camel/pgevent/PgEventConsumerTest.java  | 127 ------------------
 .../apache/camel/pgevent/PgEventHelperTest.java    |  65 ----------
 .../apache/camel/pgevent/PgEventProducerTest.java  | 144 ---------------------
 .../dsl/PgEventEndpointBuilderFactory.java         | 130 +++++++++++++++++++
 10 files changed, 340 insertions(+), 375 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pgevent.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pgevent.json
index 9abe4d1d5b2..1f11c8b900e 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pgevent.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/pgevent.json
@@ -39,9 +39,13 @@
     "bridgeErrorHandler": { "index": 4, "kind": "parameter", "displayName": 
"Bridge Error Handler", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Allows for bridging the consumer to the 
Camel routing Error Handler, which mean any exceptions (if possible) occurred 
while the Camel consumer is trying to pickup incoming  [...]
     "exceptionHandler": { "index": 5, "kind": "parameter", "displayName": 
"Exception Handler", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", 
"deprecated": false, "autowired": false, "secret": false, "description": "To 
let the consumer use a custom ExceptionHandler. Notice if the option 
bridgeErrorHandler is enabled then this option is not in use. By def [...]
     "exchangePattern": { "index": 6, "kind": "parameter", "displayName": 
"Exchange Pattern", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], 
"deprecated": false, "autowired": false, "secret": false, "description": "Sets 
the exchange pattern when the consumer creates an exchange." },
-    "lazyStartProducer": { "index": 7, "kind": "parameter", "displayName": 
"Lazy Start Producer", "group": "producer (advanced)", "label": 
"producer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether the producer should be started 
lazy (on the first message). By starting lazy you can use this to allow 
CamelContext and routes to startup in situations where a produc [...]
-    "datasource": { "index": 8, "kind": "parameter", "displayName": 
"Datasource", "group": "advanced", "label": "advanced", "required": false, 
"type": "object", "javaType": "javax.sql.DataSource", "deprecated": false, 
"autowired": false, "secret": false, "description": "To connect using the given 
javax.sql.DataSource instead of using hostname and port." },
-    "pass": { "index": 9, "kind": "parameter", "displayName": "Pass", "group": 
"security", "label": "security", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": true, "description": "Password for login" },
-    "user": { "index": 10, "kind": "parameter", "displayName": "User", 
"group": "security", "label": "security", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": true, "defaultValue": "postgres", "description": "Username for login" 
}
+    "reconnectDelay": { "index": 7, "kind": "parameter", "displayName": 
"Reconnect Delay", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "integer", "javaType": "int", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, 
"description": "When the consumer unexpected lose connection to the database, 
then this specifies the interval (millis) between re-connection attempts to 
establish a new connection." },
+    "workerPool": { "index": 8, "kind": "parameter", "displayName": "Worker 
Pool", "group": "consumer (advanced)", "label": "consumer,advanced", 
"required": false, "type": "object", "javaType": 
"java.util.concurrent.ExecutorService", "deprecated": false, "autowired": 
false, "secret": false, "description": "To use a custom worker pool for 
processing the events from the database." },
+    "workerPoolCoreSize": { "index": 9, "kind": "parameter", "displayName": 
"Worker Pool Core Size", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "integer", "javaType": "int", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, 
"description": "Number of core threads in the worker pool for processing the 
events from the database." },
+    "workerPoolMaxSize": { "index": 10, "kind": "parameter", "displayName": 
"Worker Pool Max Size", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "integer", "javaType": "int", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, 
"description": "Maximum number of threads in the worker pool for processing the 
events from the database." },
+    "lazyStartProducer": { "index": 11, "kind": "parameter", "displayName": 
"Lazy Start Producer", "group": "producer (advanced)", "label": 
"producer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether the producer should be started 
lazy (on the first message). By starting lazy you can use this to allow 
CamelContext and routes to startup in situations where a produ [...]
+    "datasource": { "index": 12, "kind": "parameter", "displayName": 
"Datasource", "group": "advanced", "label": "advanced", "required": false, 
"type": "object", "javaType": "javax.sql.DataSource", "deprecated": false, 
"autowired": false, "secret": false, "description": "To connect using the given 
javax.sql.DataSource instead of using hostname and port." },
+    "pass": { "index": 13, "kind": "parameter", "displayName": "Pass", 
"group": "security", "label": "security", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": true, "description": "Password for login" },
+    "user": { "index": 14, "kind": "parameter", "displayName": "User", 
"group": "security", "label": "security", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": true, "defaultValue": "postgres", "description": "Username for login" 
}
   }
 }
diff --git 
a/components/camel-pgevent/src/generated/java/org/apache/camel/component/pgevent/PgEventEndpointConfigurer.java
 
b/components/camel-pgevent/src/generated/java/org/apache/camel/component/pgevent/PgEventEndpointConfigurer.java
index 756e75bb17e..e453dfaf8d9 100644
--- 
a/components/camel-pgevent/src/generated/java/org/apache/camel/component/pgevent/PgEventEndpointConfigurer.java
+++ 
b/components/camel-pgevent/src/generated/java/org/apache/camel/component/pgevent/PgEventEndpointConfigurer.java
@@ -33,7 +33,15 @@ public class PgEventEndpointConfigurer extends 
PropertyConfigurerSupport impleme
         case "lazystartproducer":
         case "lazyStartProducer": 
target.setLazyStartProducer(property(camelContext, boolean.class, value)); 
return true;
         case "pass": target.setPass(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "reconnectdelay":
+        case "reconnectDelay": target.setReconnectDelay(property(camelContext, 
int.class, value)); return true;
         case "user": target.setUser(property(camelContext, 
java.lang.String.class, value)); return true;
+        case "workerpool":
+        case "workerPool": target.setWorkerPool(property(camelContext, 
java.util.concurrent.ExecutorService.class, value)); return true;
+        case "workerpoolcoresize":
+        case "workerPoolCoreSize": 
target.setWorkerPoolCoreSize(property(camelContext, int.class, value)); return 
true;
+        case "workerpoolmaxsize":
+        case "workerPoolMaxSize": 
target.setWorkerPoolMaxSize(property(camelContext, int.class, value)); return 
true;
         default: return false;
         }
     }
@@ -51,7 +59,15 @@ public class PgEventEndpointConfigurer extends 
PropertyConfigurerSupport impleme
         case "lazystartproducer":
         case "lazyStartProducer": return boolean.class;
         case "pass": return java.lang.String.class;
+        case "reconnectdelay":
+        case "reconnectDelay": return int.class;
         case "user": return java.lang.String.class;
+        case "workerpool":
+        case "workerPool": return java.util.concurrent.ExecutorService.class;
+        case "workerpoolcoresize":
+        case "workerPoolCoreSize": return int.class;
+        case "workerpoolmaxsize":
+        case "workerPoolMaxSize": return int.class;
         default: return null;
         }
     }
@@ -70,7 +86,15 @@ public class PgEventEndpointConfigurer extends 
PropertyConfigurerSupport impleme
         case "lazystartproducer":
         case "lazyStartProducer": return target.isLazyStartProducer();
         case "pass": return target.getPass();
+        case "reconnectdelay":
+        case "reconnectDelay": return target.getReconnectDelay();
         case "user": return target.getUser();
+        case "workerpool":
+        case "workerPool": return target.getWorkerPool();
+        case "workerpoolcoresize":
+        case "workerPoolCoreSize": return target.getWorkerPoolCoreSize();
+        case "workerpoolmaxsize":
+        case "workerPoolMaxSize": return target.getWorkerPoolMaxSize();
         default: return null;
         }
     }
diff --git 
a/components/camel-pgevent/src/generated/java/org/apache/camel/component/pgevent/PgEventEndpointUriFactory.java
 
b/components/camel-pgevent/src/generated/java/org/apache/camel/component/pgevent/PgEventEndpointUriFactory.java
index dde682c9feb..b0d1353eef6 100644
--- 
a/components/camel-pgevent/src/generated/java/org/apache/camel/component/pgevent/PgEventEndpointUriFactory.java
+++ 
b/components/camel-pgevent/src/generated/java/org/apache/camel/component/pgevent/PgEventEndpointUriFactory.java
@@ -23,7 +23,7 @@ public class PgEventEndpointUriFactory extends 
org.apache.camel.support.componen
     private static final Set<String> SECRET_PROPERTY_NAMES;
     private static final Set<String> MULTI_VALUE_PREFIXES;
     static {
-        Set<String> props = new HashSet<>(11);
+        Set<String> props = new HashSet<>(15);
         props.add("bridgeErrorHandler");
         props.add("channel");
         props.add("database");
@@ -34,7 +34,11 @@ public class PgEventEndpointUriFactory extends 
org.apache.camel.support.componen
         props.add("lazyStartProducer");
         props.add("pass");
         props.add("port");
+        props.add("reconnectDelay");
         props.add("user");
+        props.add("workerPool");
+        props.add("workerPoolCoreSize");
+        props.add("workerPoolMaxSize");
         PROPERTY_NAMES = Collections.unmodifiableSet(props);
         Set<String> secretProps = new HashSet<>(2);
         secretProps.add("pass");
diff --git 
a/components/camel-pgevent/src/generated/resources/META-INF/org/apache/camel/component/pgevent/pgevent.json
 
b/components/camel-pgevent/src/generated/resources/META-INF/org/apache/camel/component/pgevent/pgevent.json
index 9abe4d1d5b2..1f11c8b900e 100644
--- 
a/components/camel-pgevent/src/generated/resources/META-INF/org/apache/camel/component/pgevent/pgevent.json
+++ 
b/components/camel-pgevent/src/generated/resources/META-INF/org/apache/camel/component/pgevent/pgevent.json
@@ -39,9 +39,13 @@
     "bridgeErrorHandler": { "index": 4, "kind": "parameter", "displayName": 
"Bridge Error Handler", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Allows for bridging the consumer to the 
Camel routing Error Handler, which mean any exceptions (if possible) occurred 
while the Camel consumer is trying to pickup incoming  [...]
     "exceptionHandler": { "index": 5, "kind": "parameter", "displayName": 
"Exception Handler", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.spi.ExceptionHandler", "optionalPrefix": "consumer.", 
"deprecated": false, "autowired": false, "secret": false, "description": "To 
let the consumer use a custom ExceptionHandler. Notice if the option 
bridgeErrorHandler is enabled then this option is not in use. By def [...]
     "exchangePattern": { "index": 6, "kind": "parameter", "displayName": 
"Exchange Pattern", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "object", "javaType": 
"org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], 
"deprecated": false, "autowired": false, "secret": false, "description": "Sets 
the exchange pattern when the consumer creates an exchange." },
-    "lazyStartProducer": { "index": 7, "kind": "parameter", "displayName": 
"Lazy Start Producer", "group": "producer (advanced)", "label": 
"producer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether the producer should be started 
lazy (on the first message). By starting lazy you can use this to allow 
CamelContext and routes to startup in situations where a produc [...]
-    "datasource": { "index": 8, "kind": "parameter", "displayName": 
"Datasource", "group": "advanced", "label": "advanced", "required": false, 
"type": "object", "javaType": "javax.sql.DataSource", "deprecated": false, 
"autowired": false, "secret": false, "description": "To connect using the given 
javax.sql.DataSource instead of using hostname and port." },
-    "pass": { "index": 9, "kind": "parameter", "displayName": "Pass", "group": 
"security", "label": "security", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": true, "description": "Password for login" },
-    "user": { "index": 10, "kind": "parameter", "displayName": "User", 
"group": "security", "label": "security", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": true, "defaultValue": "postgres", "description": "Username for login" 
}
+    "reconnectDelay": { "index": 7, "kind": "parameter", "displayName": 
"Reconnect Delay", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "integer", "javaType": "int", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 5000, 
"description": "When the consumer unexpected lose connection to the database, 
then this specifies the interval (millis) between re-connection attempts to 
establish a new connection." },
+    "workerPool": { "index": 8, "kind": "parameter", "displayName": "Worker 
Pool", "group": "consumer (advanced)", "label": "consumer,advanced", 
"required": false, "type": "object", "javaType": 
"java.util.concurrent.ExecutorService", "deprecated": false, "autowired": 
false, "secret": false, "description": "To use a custom worker pool for 
processing the events from the database." },
+    "workerPoolCoreSize": { "index": 9, "kind": "parameter", "displayName": 
"Worker Pool Core Size", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "integer", "javaType": "int", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 1, 
"description": "Number of core threads in the worker pool for processing the 
events from the database." },
+    "workerPoolMaxSize": { "index": 10, "kind": "parameter", "displayName": 
"Worker Pool Max Size", "group": "consumer (advanced)", "label": 
"consumer,advanced", "required": false, "type": "integer", "javaType": "int", 
"deprecated": false, "autowired": false, "secret": false, "defaultValue": 10, 
"description": "Maximum number of threads in the worker pool for processing the 
events from the database." },
+    "lazyStartProducer": { "index": 11, "kind": "parameter", "displayName": 
"Lazy Start Producer", "group": "producer (advanced)", "label": 
"producer,advanced", "required": false, "type": "boolean", "javaType": 
"boolean", "deprecated": false, "autowired": false, "secret": false, 
"defaultValue": false, "description": "Whether the producer should be started 
lazy (on the first message). By starting lazy you can use this to allow 
CamelContext and routes to startup in situations where a produ [...]
+    "datasource": { "index": 12, "kind": "parameter", "displayName": 
"Datasource", "group": "advanced", "label": "advanced", "required": false, 
"type": "object", "javaType": "javax.sql.DataSource", "deprecated": false, 
"autowired": false, "secret": false, "description": "To connect using the given 
javax.sql.DataSource instead of using hostname and port." },
+    "pass": { "index": 13, "kind": "parameter", "displayName": "Pass", 
"group": "security", "label": "security", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": true, "description": "Password for login" },
+    "user": { "index": 14, "kind": "parameter", "displayName": "User", 
"group": "security", "label": "security", "required": false, "type": "string", 
"javaType": "java.lang.String", "deprecated": false, "autowired": false, 
"secret": true, "defaultValue": "postgres", "description": "Username for login" 
}
   }
 }
diff --git 
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
 
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
index e0ce6dfe92e..2b0509fb0b4 100644
--- 
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
+++ 
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
@@ -17,6 +17,8 @@
 package org.apache.camel.component.pgevent;
 
 import java.sql.PreparedStatement;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 
 import com.impossibl.postgres.api.jdbc.PGConnection;
 import com.impossibl.postgres.api.jdbc.PGNotificationListener;
@@ -24,68 +26,142 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.support.DefaultConsumer;
+import org.apache.camel.util.backoff.BackOff;
+import org.apache.camel.util.backoff.BackOffTimer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * The PgEvent consumer.
  */
-public class PgEventConsumer extends DefaultConsumer implements 
PGNotificationListener {
+public class PgEventConsumer extends DefaultConsumer {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PgEventConsumer.class);
 
+    private final PgEventListener listener = new PgEventListener();
     private final PgEventEndpoint endpoint;
     private PGConnection dbConnection;
+    private ScheduledExecutorService reconnectPool;
+    private ExecutorService workerPool;
+    private boolean shutdownWorkerPool;
+    private BackOffTimer timer;
 
     public PgEventConsumer(PgEventEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
         this.endpoint = endpoint;
     }
 
+    public PgEventListener getPgEventListener() {
+        return listener;
+    }
+
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        dbConnection = endpoint.initJdbc();
-        String sql = String.format("LISTEN %s", endpoint.getChannel());
-        try (PreparedStatement statement = dbConnection.prepareStatement(sql)) 
{
-            statement.execute();
+        if (endpoint.getWorkerPool() != null) {
+            workerPool = endpoint.getWorkerPool();
+        } else {
+            workerPool = endpoint.createWorkerPool();
+            shutdownWorkerPool = true;
         }
-        dbConnection.addNotificationListener(endpoint.getChannel(), 
endpoint.getChannel(), this);
+        // used for re-connecting to the database
+        reconnectPool = 
getEndpoint().getCamelContext().getExecutorServiceManager()
+                .newSingleThreadScheduledExecutor(this, "Reconnector");
+        timer = new BackOffTimer(reconnectPool);
+        listener.initConnection();
     }
 
     @Override
-    public void notification(int processId, String channel, String payload) {
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Notification processId: {}, channel: {}, payload: {}", 
processId, channel, payload);
+    protected void doStop() throws Exception {
+        super.doStop();
+        listener.closeConnection();
+        
getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(reconnectPool);
+        timer = null;
+        if (shutdownWorkerPool && workerPool != null) {
+            LOG.debug("Shutting down PgEventConsumer worker threads with 
timeout {} millis", 10000);
+            
endpoint.getCamelContext().getExecutorServiceManager().shutdownGraceful(workerPool,
 10000);
+            workerPool = null;
         }
+    }
 
-        Exchange exchange = createExchange(false);
-        Message msg = exchange.getIn();
-        msg.setHeader(PgEventConstants.HEADER_CHANNEL, channel);
-        msg.setBody(payload);
+    public class PgEventListener implements PGNotificationListener {
 
-        try {
-            getProcessor().process(exchange);
-        } catch (Exception e) {
-            exchange.setException(e);
-        }
-        if (exchange.getException() != null) {
-            String cause = "Unable to process incoming notification from 
PostgreSQL: processId='" + processId + "', channel='"
-                           + channel + "', payload='" + payload + "'";
-            getExceptionHandler().handleException(cause, 
exchange.getException());
+        public void reconnect() {
+            BackOff bo = 
BackOff.builder().delay(endpoint.getReconnectDelay()).build();
+            timer.schedule(bo, t -> {
+                LOG.debug("Connecting attempt #" + t.getCurrentAttempts());
+                try {
+                    initConnection();
+                } catch (Exception e) {
+                    return true;
+                }
+                LOG.debug("Connecting successful");
+                return false;
+            });
         }
-        releaseExchange(exchange, false);
-    }
 
-    @Override
-    protected void doStop() throws Exception {
-        if (dbConnection != null) {
-            dbConnection.removeNotificationListener(endpoint.getChannel());
-            String sql = String.format("UNLISTEN %s", endpoint.getChannel());
+        public void initConnection() throws Exception {
+            dbConnection = endpoint.initJdbc();
+            String sql = String.format("LISTEN %s", endpoint.getChannel());
             try (PreparedStatement statement = 
dbConnection.prepareStatement(sql)) {
                 statement.execute();
             }
-            dbConnection.close();
+            dbConnection.addNotificationListener(endpoint.getChannel(), 
endpoint.getChannel(), listener);
+        }
+
+        public void closeConnection() throws Exception {
+            if (dbConnection != null) {
+                try {
+                    
dbConnection.removeNotificationListener(endpoint.getChannel());
+                    String sql = String.format("UNLISTEN %s", 
endpoint.getChannel());
+                    try (PreparedStatement statement = 
dbConnection.prepareStatement(sql)) {
+                        statement.execute();
+                    }
+                    dbConnection.close();
+                } catch (Exception e) {
+                    // ignore
+                }
+            }
+            dbConnection = null;
+        }
+
+        @Override
+        public void notification(int processId, String channel, String 
payload) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Notification processId: {}, channel: {}, payload: 
{}", processId, channel, payload);
+            }
+
+            Exchange exchange = createExchange(false);
+            Message msg = exchange.getIn();
+            msg.setHeader(PgEventConstants.HEADER_CHANNEL, channel);
+            msg.setBody(payload);
+
+            // use worker pool to avoid blocking notification thread
+            if (workerPool != null) {
+                workerPool.submit(() -> {
+                    try {
+                        getProcessor().process(exchange);
+                    } catch (Exception e) {
+                        exchange.setException(e);
+                    }
+                    if (exchange.getException() != null) {
+                        String cause
+                                = "Unable to process incoming notification 
from PostgreSQL: processId='" + processId
+                                  + "', channel='"
+                                  + channel + "', payload='" + payload + "'";
+                        getExceptionHandler().handleException(cause, 
exchange.getException());
+                    }
+                    releaseExchange(exchange, false);
+                });
+            }
+        }
+
+        @Override
+        public void closed() {
+            // connection lost, so we need to re-connect
+            LOG.warn("Connection to PostgreSQL lost unexpected. 
Re-connecting...");
+            reconnect();
         }
     }
+
 }
diff --git 
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventEndpoint.java
 
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventEndpoint.java
index ca506ffef4b..8e56d15a6b8 100644
--- 
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventEndpoint.java
+++ 
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventEndpoint.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.pgevent;
 
 import java.sql.DriverManager;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 
 import javax.sql.DataSource;
 
@@ -70,6 +71,14 @@ public class PgEventEndpoint extends DefaultEndpoint 
implements EndpointServiceL
     private String pass;
     @UriParam(label = "advanced")
     private DataSource datasource;
+    @UriParam(label = "consumer,advanced", defaultValue = "5000")
+    private int reconnectDelay = 5000;
+    @UriParam(label = "consumer,advanced")
+    private ExecutorService workerPool;
+    @UriParam(label = "consumer,advanced", defaultValue = "1")
+    private int workerPoolCoreSize = 1;
+    @UriParam(label = "consumer,advanced", defaultValue = "10")
+    private int workerPoolMaxSize = 10;
 
     private final String uri;
 
@@ -186,6 +195,11 @@ public class PgEventEndpoint extends DefaultEndpoint 
implements EndpointServiceL
         return consumer;
     }
 
+    ExecutorService createWorkerPool() {
+        return 
getCamelContext().getExecutorServiceManager().newThreadPool(this,
+                "PgEventConsumer[" + channel + "]", workerPoolCoreSize, 
workerPoolMaxSize);
+    }
+
     public String getHost() {
         return host;
     }
@@ -263,4 +277,49 @@ public class PgEventEndpoint extends DefaultEndpoint 
implements EndpointServiceL
     public void setDatasource(DataSource datasource) {
         this.datasource = datasource;
     }
+
+    public int getReconnectDelay() {
+        return reconnectDelay;
+    }
+
+    /**
+     * When the consumer unexpected lose connection to the database, then this 
specifies the interval (millis) between
+     * re-connection attempts to establish a new connection.
+     */
+    public void setReconnectDelay(int reconnectDelay) {
+        this.reconnectDelay = reconnectDelay;
+    }
+
+    public ExecutorService getWorkerPool() {
+        return workerPool;
+    }
+
+    /**
+     * To use a custom worker pool for processing the events from the database.
+     */
+    public void setWorkerPool(ExecutorService workerPool) {
+        this.workerPool = workerPool;
+    }
+
+    public int getWorkerPoolCoreSize() {
+        return workerPoolCoreSize;
+    }
+
+    /**
+     * Number of core threads in the worker pool for processing the events 
from the database.
+     */
+    public void setWorkerPoolCoreSize(int workerPoolCoreSize) {
+        this.workerPoolCoreSize = workerPoolCoreSize;
+    }
+
+    public int getWorkerPoolMaxSize() {
+        return workerPoolMaxSize;
+    }
+
+    /**
+     * Maximum number of threads in the worker pool for processing the events 
from the database.
+     */
+    public void setWorkerPoolMaxSize(int workerPoolMaxSize) {
+        this.workerPoolMaxSize = workerPoolMaxSize;
+    }
 }
diff --git 
a/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventConsumerTest.java
 
b/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventConsumerTest.java
deleted file mode 100644
index 6fad950412e..00000000000
--- 
a/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventConsumerTest.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.pgevent;
-
-import java.sql.PreparedStatement;
-
-import com.impossibl.postgres.api.jdbc.PGConnection;
-import com.impossibl.postgres.jdbc.PGDataSource;
-import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangeExtension;
-import org.apache.camel.ExtendedCamelContext;
-import org.apache.camel.Message;
-import org.apache.camel.Processor;
-import org.apache.camel.component.pgevent.PgEventConsumer;
-import org.apache.camel.component.pgevent.PgEventEndpoint;
-import org.apache.camel.spi.ExchangeFactory;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@ExtendWith(MockitoExtension.class)
-public class PgEventConsumerTest {
-
-    @Test
-    public void testPgEventConsumerStart() throws Exception {
-        PGDataSource dataSource = mock(PGDataSource.class);
-        PGConnection connection = mock(PGConnection.class);
-        PreparedStatement statement = mock(PreparedStatement.class);
-        PgEventEndpoint endpoint = mock(PgEventEndpoint.class);
-        Processor processor = mock(Processor.class);
-        CamelContext camelContext = mock(CamelContext.class);
-        ExtendedCamelContext ecc = mock(ExtendedCamelContext.class);
-        ExchangeFactory ef = mock(ExchangeFactory.class);
-
-        when(endpoint.getCamelContext()).thenReturn(camelContext);
-        when(camelContext.getCamelContextExtension()).thenReturn(ecc);
-        when(ecc.getExchangeFactory()).thenReturn(ef);
-        when(ef.newExchangeFactory(any())).thenReturn(ef);
-        when(endpoint.initJdbc()).thenReturn(connection);
-        when(connection.prepareStatement("LISTEN 
camel")).thenReturn(statement);
-        when(endpoint.getChannel()).thenReturn("camel");
-
-        PgEventConsumer consumer = new PgEventConsumer(endpoint, processor);
-        consumer.start();
-
-        verify(connection).addNotificationListener("camel", "camel", consumer);
-        assertTrue(consumer.isStarted());
-    }
-
-    @Test
-    public void testPgEventConsumerStop() throws Exception {
-        PGDataSource dataSource = mock(PGDataSource.class);
-        PGConnection connection = mock(PGConnection.class);
-        PreparedStatement statement = mock(PreparedStatement.class);
-        PgEventEndpoint endpoint = mock(PgEventEndpoint.class);
-        Processor processor = mock(Processor.class);
-        CamelContext camelContext = mock(CamelContext.class);
-        ExtendedCamelContext ecc = mock(ExtendedCamelContext.class);
-        ExchangeFactory ef = mock(ExchangeFactory.class);
-
-        when(endpoint.getCamelContext()).thenReturn(camelContext);
-        when(camelContext.getCamelContextExtension()).thenReturn(ecc);
-        when(ecc.getExchangeFactory()).thenReturn(ef);
-        when(ef.newExchangeFactory(any())).thenReturn(ef);
-        when(endpoint.initJdbc()).thenReturn(connection);
-        when(connection.prepareStatement("LISTEN 
camel")).thenReturn(statement);
-        when(connection.prepareStatement("LISTEN 
camel")).thenReturn(statement);
-        when(endpoint.getChannel()).thenReturn("camel");
-        when(connection.prepareStatement("UNLISTEN 
camel")).thenReturn(statement);
-
-        PgEventConsumer consumer = new PgEventConsumer(endpoint, processor);
-        consumer.start();
-        consumer.stop();
-
-        verify(connection).removeNotificationListener("camel");
-        verify(connection).close();
-        assertTrue(consumer.isStopped());
-    }
-
-    @Test
-    public void testPgEventNotification() throws Exception {
-        PgEventEndpoint endpoint = mock(PgEventEndpoint.class);
-        Processor processor = mock(Processor.class);
-        Exchange exchange = mock(Exchange.class);
-        ExchangeExtension exchangeExtension = mock(ExchangeExtension.class);
-        Message message = mock(Message.class);
-        CamelContext camelContext = mock(CamelContext.class);
-        ExtendedCamelContext ecc = mock(ExtendedCamelContext.class);
-        ExchangeFactory ef = mock(ExchangeFactory.class);
-
-        when(endpoint.getCamelContext()).thenReturn(camelContext);
-        when(camelContext.getCamelContextExtension()).thenReturn(ecc);
-        when(ecc.getExchangeFactory()).thenReturn(ef);
-        when(ef.newExchangeFactory(any())).thenReturn(ef);
-        when(ef.create(endpoint, false)).thenReturn(exchange);
-        when(exchange.getExchangeExtension()).thenReturn(exchangeExtension);
-        when(exchange.getIn()).thenReturn(message);
-
-        PgEventConsumer consumer = new PgEventConsumer(endpoint, processor);
-        consumer.notification(1, "camel", "some event");
-
-        verify(message).setHeader("channel", "camel");
-        verify(message).setBody("some event");
-        verify(processor).process(exchange);
-    }
-}
diff --git 
a/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventHelperTest.java
 
b/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventHelperTest.java
deleted file mode 100644
index 3e1df152970..00000000000
--- 
a/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventHelperTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.pgevent;
-
-import java.sql.Connection;
-
-import com.impossibl.postgres.api.jdbc.PGConnection;
-import org.apache.camel.component.pgevent.PgEventHelper;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-@ExtendWith(MockitoExtension.class)
-public class PgEventHelperTest {
-
-    @Test
-    public void testToPGConnectionWithNullConnection() {
-        assertThrows(IllegalArgumentException.class,
-                () -> PgEventHelper.toPGConnection(null));
-    }
-
-    @Test
-    public void testToPGConnectionWithNonWrappedConnection() throws Exception {
-        Connection originalConnection = mock(PGConnection.class);
-        PGConnection actualConnection = 
PgEventHelper.toPGConnection(originalConnection);
-        assertSame(originalConnection, actualConnection);
-    }
-
-    @Test
-    public void testToPGConnectionWithWrappedConnection() throws Exception {
-        Connection wrapperConnection = mock(Connection.class);
-        PGConnection unwrappedConnection = mock(PGConnection.class);
-        
when(wrapperConnection.isWrapperFor(PGConnection.class)).thenReturn(true);
-        
when(wrapperConnection.unwrap(PGConnection.class)).thenReturn(unwrappedConnection);
-        PGConnection actualConnection = 
PgEventHelper.toPGConnection(wrapperConnection);
-        assertSame(unwrappedConnection, actualConnection);
-    }
-
-    @Test
-    public void testToPGConnectionWithInvalidWrappedConnection() throws 
Exception {
-        Connection wrapperConnection = mock(Connection.class);
-        
when(wrapperConnection.isWrapperFor(PGConnection.class)).thenReturn(false);
-        assertThrows(IllegalStateException.class,
-                () -> PgEventHelper.toPGConnection(wrapperConnection));
-    }
-}
diff --git 
a/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventProducerTest.java
 
b/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventProducerTest.java
deleted file mode 100644
index d5ad387f15a..00000000000
--- 
a/components/camel-pgevent/src/test/java/org/apache/camel/pgevent/PgEventProducerTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.pgevent;
-
-import java.sql.CallableStatement;
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-
-import com.impossibl.postgres.api.jdbc.PGConnection;
-import com.impossibl.postgres.jdbc.PGDataSource;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.component.pgevent.InvalidStateException;
-import org.apache.camel.component.pgevent.PgEventEndpoint;
-import org.apache.camel.component.pgevent.PgEventProducer;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.ArgumentMatchers;
-import org.mockito.junit.jupiter.MockitoExtension;
-
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@ExtendWith(MockitoExtension.class)
-public class PgEventProducerTest {
-
-    private PgEventEndpoint endpoint = mock(PgEventEndpoint.class);
-    private PGDataSource dataSource = mock(PGDataSource.class);
-    private PGConnection connection = mock(PGConnection.class);
-    private PreparedStatement statement = mock(PreparedStatement.class);
-    private Exchange exchange = mock(Exchange.class);
-    private Message message = mock(Message.class);
-
-    @Test
-    public void testPgEventProducerStart() throws Exception {
-        when(endpoint.getDatasource()).thenReturn(dataSource);
-        when(dataSource.getConnection()).thenReturn(connection);
-
-        PgEventProducer producer = new PgEventProducer(endpoint);
-        producer.start();
-
-        assertTrue(producer.isStarted());
-    }
-
-    @Test
-    public void testPgEventProducerStop() throws Exception {
-        when(endpoint.initJdbc()).thenReturn(connection);
-
-        PgEventProducer producer = new PgEventProducer(endpoint);
-        producer.start();
-        producer.stop();
-
-        verify(connection).close();
-        assertTrue(producer.isStopped());
-    }
-
-    @Test
-    public void testPgEventProducerProcessDbThrowsInvalidStateException() 
throws Exception {
-        when(endpoint.initJdbc()).thenReturn(connection);
-        when(connection.isClosed()).thenThrow(new SQLException("DB problem 
occurred"));
-
-        PgEventProducer producer = new PgEventProducer(endpoint);
-        producer.start();
-        assertThrows(InvalidStateException.class,
-                () -> producer.process(exchange));
-    }
-
-    @Test
-    public void testPgEventProducerProcessDbConnectionClosed() throws 
Exception {
-        PGConnection connectionNew = mock(PGConnection.class);
-
-        when(endpoint.initJdbc()).thenReturn(connection);
-        when(endpoint.getDatasource()).thenReturn(dataSource);
-        when(dataSource.getConnection()).thenReturn(connection, connectionNew);
-        when(connection.isClosed()).thenReturn(true);
-        when(exchange.getIn()).thenReturn(message);
-        when(message.getBody(String.class)).thenReturn("pgevent");
-        when(endpoint.getChannel()).thenReturn("camel");
-        
when(connection.prepareStatement(ArgumentMatchers.anyString())).thenReturn(statement);
-
-        PgEventProducer producer = new PgEventProducer(endpoint);
-        producer.start();
-        producer.process(exchange);
-
-        verify(statement).execute();
-    }
-
-    @Test
-    public void testPgEventProducerProcessServerMinimumVersionMatched() throws 
Exception {
-        CallableStatement statement = mock(CallableStatement.class);
-
-        when(endpoint.initJdbc()).thenReturn(connection);
-        when(endpoint.getDatasource()).thenReturn(dataSource);
-        when(connection.isClosed()).thenReturn(false);
-        when(dataSource.getConnection()).thenReturn(connection);
-        when(exchange.getIn()).thenReturn(message);
-        when(message.getBody(String.class)).thenReturn("pgevent");
-        when(endpoint.getChannel()).thenReturn("camel");
-        when(connection.isServerMinimumVersion(9, 0)).thenReturn(true);
-        
when(connection.prepareCall(ArgumentMatchers.anyString())).thenReturn(statement);
-
-        PgEventProducer producer = new PgEventProducer(endpoint);
-        producer.start();
-        producer.process(exchange);
-
-        verify(statement).execute();
-    }
-
-    @Test
-    public void testPgEventProducerProcessServerMinimumVersionNotMatched() 
throws Exception {
-        when(endpoint.initJdbc()).thenReturn(connection);
-        when(endpoint.getDatasource()).thenReturn(dataSource);
-        when(connection.isClosed()).thenReturn(false);
-        when(dataSource.getConnection()).thenReturn(connection);
-        when(exchange.getIn()).thenReturn(message);
-        when(message.getBody(String.class)).thenReturn("pgevent");
-        when(endpoint.getChannel()).thenReturn("camel");
-        when(connection.isServerMinimumVersion(9, 0)).thenReturn(false);
-        when(connection.prepareStatement("NOTIFY camel, 
'pgevent'")).thenReturn(statement);
-
-        PgEventProducer producer = new PgEventProducer(endpoint);
-        producer.start();
-        producer.process(exchange);
-
-        verify(statement).execute();
-    }
-}
diff --git 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PgEventEndpointBuilderFactory.java
 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PgEventEndpointBuilderFactory.java
index 878f1ea7127..2fb4a8eb84e 100644
--- 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PgEventEndpointBuilderFactory.java
+++ 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/PgEventEndpointBuilderFactory.java
@@ -200,6 +200,136 @@ public interface PgEventEndpointBuilderFactory {
             doSetProperty("exchangePattern", exchangePattern);
             return this;
         }
+        /**
+         * When the consumer unexpected lose connection to the database, then
+         * this specifies the interval (millis) between re-connection attempts
+         * to establish a new connection.
+         * 
+         * The option is a: <code>int</code> type.
+         * 
+         * Default: 5000
+         * Group: consumer (advanced)
+         * 
+         * @param reconnectDelay the value to set
+         * @return the dsl builder
+         */
+        default AdvancedPgEventEndpointConsumerBuilder reconnectDelay(int 
reconnectDelay) {
+            doSetProperty("reconnectDelay", reconnectDelay);
+            return this;
+        }
+        /**
+         * When the consumer unexpected lose connection to the database, then
+         * this specifies the interval (millis) between re-connection attempts
+         * to establish a new connection.
+         * 
+         * The option will be converted to a <code>int</code> type.
+         * 
+         * Default: 5000
+         * Group: consumer (advanced)
+         * 
+         * @param reconnectDelay the value to set
+         * @return the dsl builder
+         */
+        default AdvancedPgEventEndpointConsumerBuilder reconnectDelay(String 
reconnectDelay) {
+            doSetProperty("reconnectDelay", reconnectDelay);
+            return this;
+        }
+        /**
+         * To use a custom worker pool for processing the events from the
+         * database.
+         * 
+         * The option is a: <code>java.util.concurrent.ExecutorService</code>
+         * type.
+         * 
+         * Group: consumer (advanced)
+         * 
+         * @param workerPool the value to set
+         * @return the dsl builder
+         */
+        default AdvancedPgEventEndpointConsumerBuilder 
workerPool(ExecutorService workerPool) {
+            doSetProperty("workerPool", workerPool);
+            return this;
+        }
+        /**
+         * To use a custom worker pool for processing the events from the
+         * database.
+         * 
+         * The option will be converted to a
+         * <code>java.util.concurrent.ExecutorService</code> type.
+         * 
+         * Group: consumer (advanced)
+         * 
+         * @param workerPool the value to set
+         * @return the dsl builder
+         */
+        default AdvancedPgEventEndpointConsumerBuilder workerPool(String 
workerPool) {
+            doSetProperty("workerPool", workerPool);
+            return this;
+        }
+        /**
+         * Number of core threads in the worker pool for processing the events
+         * from the database.
+         * 
+         * The option is a: <code>int</code> type.
+         * 
+         * Default: 1
+         * Group: consumer (advanced)
+         * 
+         * @param workerPoolCoreSize the value to set
+         * @return the dsl builder
+         */
+        default AdvancedPgEventEndpointConsumerBuilder workerPoolCoreSize(int 
workerPoolCoreSize) {
+            doSetProperty("workerPoolCoreSize", workerPoolCoreSize);
+            return this;
+        }
+        /**
+         * Number of core threads in the worker pool for processing the events
+         * from the database.
+         * 
+         * The option will be converted to a <code>int</code> type.
+         * 
+         * Default: 1
+         * Group: consumer (advanced)
+         * 
+         * @param workerPoolCoreSize the value to set
+         * @return the dsl builder
+         */
+        default AdvancedPgEventEndpointConsumerBuilder 
workerPoolCoreSize(String workerPoolCoreSize) {
+            doSetProperty("workerPoolCoreSize", workerPoolCoreSize);
+            return this;
+        }
+        /**
+         * Maximum number of threads in the worker pool for processing the
+         * events from the database.
+         * 
+         * The option is a: <code>int</code> type.
+         * 
+         * Default: 10
+         * Group: consumer (advanced)
+         * 
+         * @param workerPoolMaxSize the value to set
+         * @return the dsl builder
+         */
+        default AdvancedPgEventEndpointConsumerBuilder workerPoolMaxSize(int 
workerPoolMaxSize) {
+            doSetProperty("workerPoolMaxSize", workerPoolMaxSize);
+            return this;
+        }
+        /**
+         * Maximum number of threads in the worker pool for processing the
+         * events from the database.
+         * 
+         * The option will be converted to a <code>int</code> type.
+         * 
+         * Default: 10
+         * Group: consumer (advanced)
+         * 
+         * @param workerPoolMaxSize the value to set
+         * @return the dsl builder
+         */
+        default AdvancedPgEventEndpointConsumerBuilder 
workerPoolMaxSize(String workerPoolMaxSize) {
+            doSetProperty("workerPoolMaxSize", workerPoolMaxSize);
+            return this;
+        }
         /**
          * To connect using the given javax.sql.DataSource instead of using
          * hostname and port.

Reply via email to