This is an automated email from the ASF dual-hosted git repository. pcongiusti pushed a commit to branch camel-4.10.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 59d2d38c6bb3df02154168150fb9720fb6a01fe8 Author: Pasquale Congiusti <[email protected]> AuthorDate: Fri Dec 5 10:46:51 2025 +0100 fix(components): sql enhancements Follow up #20229 --- .github/actions/install-mvnd/action.yml | 2 +- .../slot/PgReplicationSlotConsumer.java | 33 ++++++++++++++-------- .../camel/component/pgevent/PgEventConsumer.java | 6 ++++ .../jdbc/ClusteredJdbcAggregationRepository.java | 2 +- .../aggregate/jdbc/JdbcAggregationRepository.java | 12 ++++---- 5 files changed, 36 insertions(+), 19 deletions(-) diff --git a/.github/actions/install-mvnd/action.yml b/.github/actions/install-mvnd/action.yml index 8f7facf4dd82..84e1769ca8b9 100644 --- a/.github/actions/install-mvnd/action.yml +++ b/.github/actions/install-mvnd/action.yml @@ -25,7 +25,7 @@ inputs: distribution: description: 'The maven distribution to use' required: true - default: 'm39-linux-amd64' + default: 'linux-amd64' dry-run: description: 'Flag to enable to the dry-run mode' required: true diff --git a/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java b/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java index 0c1882670a9d..b97848d5cf9d 100644 --- a/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java +++ b/components/camel-pg-replication-slot/src/main/java/org/apache/camel/component/pg/replication/slot/PgReplicationSlotConsumer.java @@ -19,9 +19,9 @@ package org.apache.camel.component.pg.replication.slot; import java.net.SocketException; import java.nio.ByteBuffer; import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Statement; import java.util.Properties; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -197,13 +197,19 @@ public class PgReplicationSlotConsumer extends ScheduledPollConsumer { } private boolean isSlotCreated() throws SQLException { - String sql - = String.format("SELECT count(*) FROM pg_replication_slots WHERE slot_name = '%s';", this.endpoint.getSlot()); + String sql = "SELECT count(*) FROM pg_replication_slots WHERE slot_name = ?"; - try (Statement statement = this.connection.createStatement(); ResultSet resultSet = statement.executeQuery(sql)) { - resultSet.next(); - return resultSet.getInt(1) > 0; + try (PreparedStatement ps = connection.prepareStatement(sql)) { + ps.setString(1, endpoint.getSlot()); + + try (ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + return rs.getInt(1) > 0; + } + } } + + return false; } private PGReplicationStream getStream() throws SQLException { @@ -232,13 +238,18 @@ public class PgReplicationSlotConsumer extends ScheduledPollConsumer { } private boolean isSlotActive() throws SQLException { - String sql = String.format("SELECT count(*) FROM pg_replication_slots where slot_name = '%s' AND active = true;", - this.endpoint.getSlot()); + String sql = "SELECT count(*) FROM pg_replication_slots WHERE slot_name = ? AND active = true"; - try (Statement statement = this.connection.createStatement(); ResultSet resultSet = statement.executeQuery(sql)) { - resultSet.next(); - return resultSet.getInt(1) > 0; + try (PreparedStatement ps = connection.prepareStatement(sql)) { + ps.setString(1, endpoint.getSlot()); + + try (ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + return rs.getInt(1) > 0; + } + } } + return false; } private void connect() throws SQLException { diff --git a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java index e0ce6dfe92ec..6499e346c60c 100644 --- a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java +++ b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java @@ -46,6 +46,9 @@ public class PgEventConsumer extends DefaultConsumer implements PGNotificationLi protected void doStart() throws Exception { super.doStart(); dbConnection = endpoint.initJdbc(); + if (!endpoint.getChannel().matches("[a-zA-Z_][a-zA-Z0-9_]*")) { + throw new IllegalArgumentException("Invalid channel name"); + } String sql = String.format("LISTEN %s", endpoint.getChannel()); try (PreparedStatement statement = dbConnection.prepareStatement(sql)) { statement.execute(); @@ -81,6 +84,9 @@ public class PgEventConsumer extends DefaultConsumer implements PGNotificationLi protected void doStop() throws Exception { if (dbConnection != null) { dbConnection.removeNotificationListener(endpoint.getChannel()); + if (!endpoint.getChannel().matches("[a-zA-Z_][a-zA-Z0-9_]*")) { + throw new IllegalArgumentException("Invalid channel name"); + } String sql = String.format("UNLISTEN %s", endpoint.getChannel()); try (PreparedStatement statement = dbConnection.prepareStatement(sql)) { statement.execute(); diff --git a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregationRepository.java b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregationRepository.java index dc2450cf37a5..28af65e419e8 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregationRepository.java +++ b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregationRepository.java @@ -77,7 +77,7 @@ public class ClusteredJdbcAggregationRepository extends JdbcAggregationRepositor LOG.debug("Removing key {}", correlationId); String table = getRepositoryName(); verifyTableName(table); - jdbcTemplate.update("DELETE FROM " + table + " WHERE " + ID + " = ? AND " + VERSION + " = ?", + jdbcTemplate.update("DELETE FROM " + table + " WHERE " + ID + " = ? AND " + VERSION + " = ?", // NOSONAR correlationId, version); insert(camelContext, confirmKey, exchange, getRepositoryNameCompleted(), version, true); diff --git a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java index 6dc89bcc0c4f..547017b36a18 100644 --- a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java +++ b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java @@ -195,7 +195,7 @@ public class JdbcAggregationRepository extends ServiceSupport String table = getRepositoryName(); verifyTableName(table); boolean present = jdbcTemplate.queryForObject( - "SELECT COUNT(1) FROM " + table + " WHERE " + ID + " = ?", Integer.class, + "SELECT COUNT(1) FROM " + table + " WHERE " + ID + " = ?", Integer.class, // NOSONAR correlationId) != 0; // Recover existing exchange with that ID @@ -386,7 +386,7 @@ public class JdbcAggregationRepository extends ServiceSupport try { verifyTableName(repositoryName); Map<String, Object> columns = jdbcTemplate.queryForMap( - String.format("SELECT %1$s, %2$s FROM %3$s WHERE %4$s=?", EXCHANGE, VERSION, repositoryName, ID), + String.format("SELECT %1$s, %2$s FROM %3$s WHERE %4$s=?", EXCHANGE, VERSION, repositoryName, ID), // NOSONAR new Object[] { key }, new int[] { Types.VARCHAR }); byte[] marshalledExchange = (byte[]) columns.get(EXCHANGE); @@ -426,7 +426,7 @@ public class JdbcAggregationRepository extends ServiceSupport LOG.debug("Removing key {}", correlationId); String table = getRepositoryName(); verifyTableName(table); - jdbcTemplate.update("DELETE FROM " + table + " WHERE " + ID + " = ? AND " + VERSION + " = ?", + jdbcTemplate.update("DELETE FROM " + table + " WHERE " + ID + " = ? AND " + VERSION + " = ?", // NOSONAR correlationId, version); insert(camelContext, confirmKey, exchange, getRepositoryNameCompleted(), version); @@ -452,7 +452,7 @@ public class JdbcAggregationRepository extends ServiceSupport String table = getRepositoryNameCompleted(); verifyTableName(table); final int mustBeOne = jdbcTemplate - .update("DELETE FROM " + table + " WHERE " + ID + " = ?", exchangeId); + .update("DELETE FROM " + table + " WHERE " + ID + " = ?", exchangeId); // NOSONAR if (mustBeOne != 1) { LOG.error("problem removing row {} from {} - DELETE statement did not return 1 but {}", exchangeId, getRepositoryNameCompleted(), mustBeOne); @@ -483,7 +483,7 @@ public class JdbcAggregationRepository extends ServiceSupport return transactionTemplateReadOnly.execute(new TransactionCallback<LinkedHashSet<String>>() { public LinkedHashSet<String> doInTransaction(TransactionStatus status) { verifyTableName(repositoryName); - List<String> keys = jdbcTemplate.query("SELECT " + ID + " FROM " + repositoryName, + List<String> keys = jdbcTemplate.query("SELECT " + ID + " FROM " + repositoryName, // NOSONAR new RowMapper<String>() { public String mapRow(ResultSet rs, int rowNum) throws SQLException { String id = rs.getString(ID); @@ -711,7 +711,7 @@ public class JdbcAggregationRepository extends ServiceSupport private int rowCount(final String repository) { verifyTableName(repository); - return jdbcTemplate.queryForObject("SELECT COUNT(1) FROM " + repository, Integer.class); + return jdbcTemplate.queryForObject("SELECT COUNT(1) FROM " + repository, Integer.class); // NOSONAR } @Override
