This is an automated email from the ASF dual-hosted git repository.
pcongiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 9531f205fad3 fix(components): sql enhancements
9531f205fad3 is described below
commit 9531f205fad3c51478026fca48bb655f7faf972b
Author: Pasquale Congiusti <[email protected]>
AuthorDate: Fri Dec 5 10:46:51 2025 +0100
fix(components): sql enhancements
Follow up #20229
---
.../pg/replication/slot/PgReplicationSlotConsumer.java | 16 ++++++++++------
.../apache/camel/component/pgevent/PgEventConsumer.java | 4 ++--
.../jdbc/ClusteredJdbcAggregationRepository.java | 2 +-
.../aggregate/jdbc/JdbcAggregationRepository.java | 12 ++++++------
4 files changed, 19 insertions(+), 15 deletions(-)
diff --git
a/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
b/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
index b61702199f4c..832a41dab223 100644
---
a/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
+++
b/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java
@@ -22,7 +22,6 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.sql.Statement;
import java.util.Properties;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -236,13 +235,18 @@ public class PgReplicationSlotConsumer extends
ScheduledPollConsumer {
}
private boolean isSlotActive() throws SQLException {
- String sql = String.format("SELECT count(*) FROM pg_replication_slots
where slot_name = '%s' AND active = true;",
- this.endpoint.getSlot());
+ String sql = "SELECT count(*) FROM pg_replication_slots WHERE
slot_name = ? AND active = true";
- try (Statement statement = this.connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql)) {
- resultSet.next();
- return resultSet.getInt(1) > 0;
+ try (PreparedStatement ps = connection.prepareStatement(sql)) {
+ ps.setString(1, endpoint.getSlot());
+
+ try (ResultSet rs = ps.executeQuery()) {
+ if (rs.next()) {
+ return rs.getInt(1) > 0;
+ }
+ }
}
+ return false;
}
private void connect() throws SQLException {
diff --git
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
index c9fcd00545a6..960bed80f699 100644
---
a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
+++
b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java
@@ -129,7 +129,7 @@ public class PgEventConsumer extends DefaultConsumer {
throw new IllegalArgumentException("Invalid channel name");
}
String sql = String.format("LISTEN %s", channel);
- try (PreparedStatement statement =
dbConnection.prepareStatement(sql)) {
+ try (PreparedStatement statement =
dbConnection.prepareStatement(sql)) { // NOSONAR
statement.execute();
}
dbConnection.addNotificationListener(channel, channel, listener);
@@ -144,7 +144,7 @@ public class PgEventConsumer extends DefaultConsumer {
}
dbConnection.removeNotificationListener(channel);
String sql = String.format("UNLISTEN %s", channel);
- try (PreparedStatement statement =
dbConnection.prepareStatement(sql)) {
+ try (PreparedStatement statement =
dbConnection.prepareStatement(sql)) { // NOSONAR
statement.execute();
}
dbConnection.close();
diff --git
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregationRepository.java
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregationRepository.java
index dc2450cf37a5..28af65e419e8 100644
---
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregationRepository.java
+++
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregationRepository.java
@@ -77,7 +77,7 @@ public class ClusteredJdbcAggregationRepository extends
JdbcAggregationRepositor
LOG.debug("Removing key {}", correlationId);
String table = getRepositoryName();
verifyTableName(table);
- jdbcTemplate.update("DELETE FROM " + table + " WHERE " +
ID + " = ? AND " + VERSION + " = ?",
+ jdbcTemplate.update("DELETE FROM " + table + " WHERE " +
ID + " = ? AND " + VERSION + " = ?", // NOSONAR
correlationId, version);
insert(camelContext, confirmKey, exchange,
getRepositoryNameCompleted(), version, true);
diff --git
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
index 6dc89bcc0c4f..547017b36a18 100644
---
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
+++
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
@@ -195,7 +195,7 @@ public class JdbcAggregationRepository extends
ServiceSupport
String table = getRepositoryName();
verifyTableName(table);
boolean present = jdbcTemplate.queryForObject(
- "SELECT COUNT(1) FROM " + table + " WHERE " + ID +
" = ?", Integer.class,
+ "SELECT COUNT(1) FROM " + table + " WHERE " + ID +
" = ?", Integer.class, // NOSONAR
correlationId) != 0;
// Recover existing exchange with that ID
@@ -386,7 +386,7 @@ public class JdbcAggregationRepository extends
ServiceSupport
try {
verifyTableName(repositoryName);
Map<String, Object> columns = jdbcTemplate.queryForMap(
- String.format("SELECT %1$s, %2$s FROM %3$s WHERE
%4$s=?", EXCHANGE, VERSION, repositoryName, ID),
+ String.format("SELECT %1$s, %2$s FROM %3$s WHERE
%4$s=?", EXCHANGE, VERSION, repositoryName, ID), // NOSONAR
new Object[] { key }, new int[] { Types.VARCHAR });
byte[] marshalledExchange = (byte[]) columns.get(EXCHANGE);
@@ -426,7 +426,7 @@ public class JdbcAggregationRepository extends
ServiceSupport
LOG.debug("Removing key {}", correlationId);
String table = getRepositoryName();
verifyTableName(table);
- jdbcTemplate.update("DELETE FROM " + table + " WHERE " +
ID + " = ? AND " + VERSION + " = ?",
+ jdbcTemplate.update("DELETE FROM " + table + " WHERE " +
ID + " = ? AND " + VERSION + " = ?", // NOSONAR
correlationId, version);
insert(camelContext, confirmKey, exchange,
getRepositoryNameCompleted(), version);
@@ -452,7 +452,7 @@ public class JdbcAggregationRepository extends
ServiceSupport
String table = getRepositoryNameCompleted();
verifyTableName(table);
final int mustBeOne = jdbcTemplate
- .update("DELETE FROM " + table + " WHERE " + ID + " =
?", exchangeId);
+ .update("DELETE FROM " + table + " WHERE " + ID + " =
?", exchangeId); // NOSONAR
if (mustBeOne != 1) {
LOG.error("problem removing row {} from {} - DELETE
statement did not return 1 but {}",
exchangeId, getRepositoryNameCompleted(),
mustBeOne);
@@ -483,7 +483,7 @@ public class JdbcAggregationRepository extends
ServiceSupport
return transactionTemplateReadOnly.execute(new
TransactionCallback<LinkedHashSet<String>>() {
public LinkedHashSet<String> doInTransaction(TransactionStatus
status) {
verifyTableName(repositoryName);
- List<String> keys = jdbcTemplate.query("SELECT " + ID + " FROM
" + repositoryName,
+ List<String> keys = jdbcTemplate.query("SELECT " + ID + " FROM
" + repositoryName, // NOSONAR
new RowMapper<String>() {
public String mapRow(ResultSet rs, int rowNum)
throws SQLException {
String id = rs.getString(ID);
@@ -711,7 +711,7 @@ public class JdbcAggregationRepository extends
ServiceSupport
private int rowCount(final String repository) {
verifyTableName(repository);
- return jdbcTemplate.queryForObject("SELECT COUNT(1) FROM " +
repository, Integer.class);
+ return jdbcTemplate.queryForObject("SELECT COUNT(1) FROM " +
repository, Integer.class); // NOSONAR
}
@Override