This is an automated email from the ASF dual-hosted git repository.

pcongiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 9531f205fad3 fix(components): sql enhancements
9531f205fad3 is described below

commit 9531f205fad3c51478026fca48bb655f7faf972b
Author: Pasquale Congiusti <[email protected]>
AuthorDate: Fri Dec 5 10:46:51 2025 +0100

    fix(components): sql enhancements
    
    Follow up #20229
---
 .../pg/replication/slot/PgReplicationSlotConsumer.java   | 16 ++++++++++------
 .../apache/camel/component/pgevent/PgEventConsumer.java  |  4 ++--
 .../jdbc/ClusteredJdbcAggregationRepository.java         |  2 +-
 .../aggregate/jdbc/JdbcAggregationRepository.java        | 12 ++++++------
 4 files changed, 19 insertions(+), 15 deletions(-)

diff --git 
a/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
 
b/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
index b61702199f4c..832a41dab223 100644
--- 
a/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
+++ 
b/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
@@ -22,7 +22,6 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.sql.Statement;
 import java.util.Properties;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -236,13 +235,18 @@ public class PgReplicationSlotConsumer extends 
ScheduledPollConsumer {
     }
 
     private boolean isSlotActive() throws SQLException {
-        String sql = String.format("SELECT count(*) FROM pg_replication_slots 
where slot_name = '%s' AND active = true;",
-                this.endpoint.getSlot());
+        String sql = "SELECT count(*) FROM pg_replication_slots WHERE 
slot_name = ? AND active = true";
 
-        try (Statement statement = this.connection.createStatement(); 
ResultSet resultSet = statement.executeQuery(sql)) {
-            resultSet.next();
-            return resultSet.getInt(1) > 0;
+        try (PreparedStatement ps = connection.prepareStatement(sql)) {
+            ps.setString(1, endpoint.getSlot());
+
+            try (ResultSet rs = ps.executeQuery()) {
+                if (rs.next()) {
+                    return rs.getInt(1) > 0;
+                }
+            }
         }
+        return false;
     }
 
     private void connect() throws SQLException {
diff --git 
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
 
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
index c9fcd00545a6..960bed80f699 100644
--- 
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
+++ 
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
@@ -129,7 +129,7 @@ public class PgEventConsumer extends DefaultConsumer {
                 throw new IllegalArgumentException("Invalid channel name");
             }
             String sql = String.format("LISTEN %s", channel);
-            try (PreparedStatement statement = 
dbConnection.prepareStatement(sql)) {
+            try (PreparedStatement statement = 
dbConnection.prepareStatement(sql)) { // NOSONAR
                 statement.execute();
             }
             dbConnection.addNotificationListener(channel, channel, listener);
@@ -144,7 +144,7 @@ public class PgEventConsumer extends DefaultConsumer {
                     }
                     dbConnection.removeNotificationListener(channel);
                     String sql = String.format("UNLISTEN %s", channel);
-                    try (PreparedStatement statement = 
dbConnection.prepareStatement(sql)) {
+                    try (PreparedStatement statement = 
dbConnection.prepareStatement(sql)) { // NOSONAR
                         statement.execute();
                     }
                     dbConnection.close();
diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregationRepository.java
 
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregationRepository.java
index dc2450cf37a5..28af65e419e8 100644
--- 
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregationRepository.java
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregationRepository.java
@@ -77,7 +77,7 @@ public class ClusteredJdbcAggregationRepository extends 
JdbcAggregationRepositor
                     LOG.debug("Removing key {}", correlationId);
                     String table = getRepositoryName();
                     verifyTableName(table);
-                    jdbcTemplate.update("DELETE FROM " + table + " WHERE " + 
ID + " = ? AND " + VERSION + " = ?",
+                    jdbcTemplate.update("DELETE FROM " + table + " WHERE " + 
ID + " = ? AND " + VERSION + " = ?", // NOSONAR
                             correlationId, version);
 
                     insert(camelContext, confirmKey, exchange, 
getRepositoryNameCompleted(), version, true);
diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
 
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
index 6dc89bcc0c4f..547017b36a18 100644
--- 
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
@@ -195,7 +195,7 @@ public class JdbcAggregationRepository extends 
ServiceSupport
                     String table = getRepositoryName();
                     verifyTableName(table);
                     boolean present = jdbcTemplate.queryForObject(
-                            "SELECT COUNT(1) FROM " + table + " WHERE " + ID + 
" = ?", Integer.class,
+                            "SELECT COUNT(1) FROM " + table + " WHERE " + ID + 
" = ?", Integer.class, // NOSONAR
                             correlationId) != 0;
 
                     // Recover existing exchange with that ID
@@ -386,7 +386,7 @@ public class JdbcAggregationRepository extends 
ServiceSupport
                 try {
                     verifyTableName(repositoryName);
                     Map<String, Object> columns = jdbcTemplate.queryForMap(
-                            String.format("SELECT %1$s, %2$s FROM %3$s WHERE 
%4$s=?", EXCHANGE, VERSION, repositoryName, ID),
+                            String.format("SELECT %1$s, %2$s FROM %3$s WHERE 
%4$s=?", EXCHANGE, VERSION, repositoryName, ID), // NOSONAR
                             new Object[] { key }, new int[] { Types.VARCHAR });
 
                     byte[] marshalledExchange = (byte[]) columns.get(EXCHANGE);
@@ -426,7 +426,7 @@ public class JdbcAggregationRepository extends 
ServiceSupport
                     LOG.debug("Removing key {}", correlationId);
                     String table = getRepositoryName();
                     verifyTableName(table);
-                    jdbcTemplate.update("DELETE FROM " + table + " WHERE " + 
ID + " = ? AND " + VERSION + " = ?",
+                    jdbcTemplate.update("DELETE FROM " + table + " WHERE " + 
ID + " = ? AND " + VERSION + " = ?", // NOSONAR
                             correlationId, version);
 
                     insert(camelContext, confirmKey, exchange, 
getRepositoryNameCompleted(), version);
@@ -452,7 +452,7 @@ public class JdbcAggregationRepository extends 
ServiceSupport
                 String table = getRepositoryNameCompleted();
                 verifyTableName(table);
                 final int mustBeOne = jdbcTemplate
-                        .update("DELETE FROM " + table + " WHERE " + ID + " = 
?", exchangeId);
+                        .update("DELETE FROM " + table + " WHERE " + ID + " = 
?", exchangeId); // NOSONAR
                 if (mustBeOne != 1) {
                     LOG.error("problem removing row {} from {} - DELETE 
statement did not return 1 but {}",
                             exchangeId, getRepositoryNameCompleted(), 
mustBeOne);
@@ -483,7 +483,7 @@ public class JdbcAggregationRepository extends 
ServiceSupport
         return transactionTemplateReadOnly.execute(new 
TransactionCallback<LinkedHashSet<String>>() {
             public LinkedHashSet<String> doInTransaction(TransactionStatus 
status) {
                 verifyTableName(repositoryName);
-                List<String> keys = jdbcTemplate.query("SELECT " + ID + " FROM 
" + repositoryName,
+                List<String> keys = jdbcTemplate.query("SELECT " + ID + " FROM 
" + repositoryName, // NOSONAR
                         new RowMapper<String>() {
                             public String mapRow(ResultSet rs, int rowNum) 
throws SQLException {
                                 String id = rs.getString(ID);
@@ -711,7 +711,7 @@ public class JdbcAggregationRepository extends 
ServiceSupport
 
     private int rowCount(final String repository) {
         verifyTableName(repository);
-        return jdbcTemplate.queryForObject("SELECT COUNT(1) FROM " + 
repository, Integer.class);
+        return jdbcTemplate.queryForObject("SELECT COUNT(1) FROM " + 
repository, Integer.class); // NOSONAR
     }
 
     @Override

Reply via email to