harangozop commented on code in PR #9:
URL: https://github.com/apache/pulsar-connectors/pull/9#discussion_r3031610422
##########
jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java:
##########
@@ -188,8 +205,22 @@ public void close() throws Exception {
@Override
public void write(Record<T> record) throws Exception {
+ if (state.get() != State.OPEN) {
+ log.warn("Sink is not in OPEN state (current: {}), failing
record", state.get());
+ record.fail();
+ return;
+ }
int number;
synchronized (incomingList) {
+ if (maxQueueSize > 0 && incomingList.size() >= maxQueueSize) {
+ if (!queueFullLogged) {
+ log.warn("Internal queue is full ({} >= {}), failing
records to apply back-pressure",
+ incomingList.size(), maxQueueSize);
+ queueFullLogged = true;
+ }
+ record.fail();
+ return;
+ }
incomingList.add(record);
number = incomingList.size();
}
Review Comment:
Fixed in fb9f552 — moved `record.fail()` and the queue-full log outside the
synchronized block using the `shouldFail`/`shouldLogQueueFull` capture pattern,
exactly as suggested.
##########
jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java:
##########
@@ -239,6 +270,9 @@ protected enum MutationType {
private void flush() {
+ if (state.get() == State.CLOSED) {
+ return;
+ }
if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) {
Review Comment:
Fixed in fb9f552 — all `incomingList.size()` reads are now inside
`synchronized(incomingList)` blocks, including the debug log in the
`isFlushing` early-return path and the emptiness check at the top of flush.
##########
jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java:
##########
@@ -114,12 +119,20 @@ public void open(Map<String, Object> config, SinkContext
sinkContext) throws Exc
int timeoutMs = jdbcSinkConfig.getTimeoutMs();
batchSize = jdbcSinkConfig.getBatchSize();
+ maxQueueSize = jdbcSinkConfig.getMaxQueueSize();
+ if (maxQueueSize == 0) {
+ // Auto-size: default to 10x batch size
+ maxQueueSize = batchSize > 0 ? batchSize * 10 : 10000;
Review Comment:
Fixed in fb9f552 — auto-sizing now uses `(long) batchSize * 10L` and caps at
`Integer.MAX_VALUE` before casting back to int.
##########
jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java:
##########
@@ -130,6 +130,18 @@ public class JdbcSinkConfig implements Serializable {
)
private NullValueAction nullValueAction = NullValueAction.FAIL;
+ @FieldDoc(
+ required = false,
+ defaultValue = "0",
+ help = "Maximum number of records to buffer in the internal queue
before applying back-pressure. "
+ + "When the queue is full, incoming records will be failed
(negatively acknowledged) so that "
+ + "the Pulsar consumer can redeliver them later. This
prevents out-of-memory errors when the "
+ + "database connection is slow or broken. "
+ + "A value of 0 (default) auto-sizes to batchSize * 10. "
+ + "A value of -1 disables the limit (unbounded, legacy
behavior)."
+ )
+ private int maxQueueSize = 0;
Review Comment:
Fixed in fb9f552 — default changed to `-1` (unbounded, legacy behavior).
Users must explicitly opt in to bounded queue by setting `maxQueueSize=0`
(auto-size) or a positive value. FieldDoc updated accordingly.
##########
jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java:
##########
@@ -130,6 +130,18 @@ public class JdbcSinkConfig implements Serializable {
)
private NullValueAction nullValueAction = NullValueAction.FAIL;
+ @FieldDoc(
+ required = false,
+ defaultValue = "0",
+ help = "Maximum number of records to buffer in the internal queue
before applying back-pressure. "
+ + "When the queue is full, incoming records will be failed
(negatively acknowledged) so that "
+ + "the Pulsar consumer can redeliver them later. This
prevents out-of-memory errors when the "
+ + "database connection is slow or broken. "
+ + "A value of 0 (default) auto-sizes to batchSize * 10. "
+ + "A value of -1 disables the limit (unbounded, legacy
behavior)."
+ )
+ private int maxQueueSize = 0;
Review Comment:
Fixed in fb9f552 — added validation in `JdbcSinkConfig.validate()` that
rejects `maxQueueSize < -1`, plus a corresponding test
(`testInvalidMaxQueueSizeRejected`) that verifies `-2` throws
`IllegalArgumentException`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]