Copilot commented on code in PR #9:
URL: https://github.com/apache/pulsar-connectors/pull/9#discussion_r3019608721
##########
jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java:
##########
@@ -239,6 +270,9 @@ protected enum MutationType {
private void flush() {
+ if (state.get() == State.CLOSED) {
+ return;
+ }
if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) {
Review Comment:
flush() reads incomingList.size() outside of synchronized(incomingList)
(line 276, and the debug log at 371). Since LinkedList is not thread-safe, this
is a data race and can lead to flush being skipped even when items are present
(or to inconsistent queue size observations). Move the emptiness/size check
inside the existing synchronized block (or switch to a concurrent collection)
so flush triggering is based on a consistent view.
##########
jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java:
##########
@@ -114,12 +119,20 @@ public void open(Map<String, Object> config, SinkContext
sinkContext) throws Exc
int timeoutMs = jdbcSinkConfig.getTimeoutMs();
batchSize = jdbcSinkConfig.getBatchSize();
+ maxQueueSize = jdbcSinkConfig.getMaxQueueSize();
+ if (maxQueueSize == 0) {
+ // Auto-size: default to 10x batch size
+ maxQueueSize = batchSize > 0 ? batchSize * 10 : 10000;
Review Comment:
maxQueueSize auto-sizing uses `batchSize * 10` on ints. For very large
configured batchSize values this can overflow to a negative number and
unintentionally disable the limit (and reintroduce OOM risk). Use a safe
multiply (e.g., long with cap at Integer.MAX_VALUE, or Math.multiplyExact with
fallback) before assigning to maxQueueSize.
```suggestion
// Auto-size: default to 10x batch size (with overflow-safe
calculation)
long calculated = batchSize > 0 ? (long) batchSize * 10L :
10000L;
if (calculated > Integer.MAX_VALUE) {
maxQueueSize = Integer.MAX_VALUE;
} else {
maxQueueSize = (int) calculated;
}
```
##########
jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java:
##########
@@ -130,6 +130,18 @@ public class JdbcSinkConfig implements Serializable {
)
private NullValueAction nullValueAction = NullValueAction.FAIL;
+ @FieldDoc(
+ required = false,
+ defaultValue = "0",
+ help = "Maximum number of records to buffer in the internal queue
before applying back-pressure. "
+ + "When the queue is full, incoming records will be failed
(negatively acknowledged) so that "
+ + "the Pulsar consumer can redeliver them later. This
prevents out-of-memory errors when the "
+ + "database connection is slow or broken. "
+ + "A value of 0 (default) auto-sizes to batchSize * 10. "
+ + "A value of -1 disables the limit (unbounded, legacy
behavior)."
+ )
+ private int maxQueueSize = 0;
Review Comment:
This PR introduces a new default behavior: previously the in-memory queue
was effectively unbounded; now maxQueueSize defaults to 0 and is interpreted as
a bounded auto-size (batchSize * 10). This can cause records to be
failed/redelivered under sustained backlog where previously they would queue.
If the intent is to preserve legacy behavior by default, consider defaulting to
-1 or treating 0 as unbounded; otherwise, please ensure the PR
description/release notes reflect the behavioral change (the PR description
currently states that `maxQueueSize=0` preserves legacy unbounded behavior).
```suggestion
defaultValue = "-1",
help = "Maximum number of records to buffer in the internal
queue before applying back-pressure. "
+ "When the queue is full, incoming records will be
failed (negatively acknowledged) so that "
+ "the Pulsar consumer can redeliver them later. This
prevents out-of-memory errors when the "
+ "database connection is slow or broken. "
+ "A value of 0 auto-sizes to batchSize * 10. "
+ "A value of -1 (default) disables the limit
(unbounded, legacy behavior)."
)
private int maxQueueSize = -1;
```
##########
jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java:
##########
@@ -188,8 +205,22 @@ public void close() throws Exception {
@Override
public void write(Record<T> record) throws Exception {
+ if (state.get() != State.OPEN) {
+ log.warn("Sink is not in OPEN state (current: {}), failing
record", state.get());
+ record.fail();
+ return;
+ }
int number;
synchronized (incomingList) {
+ if (maxQueueSize > 0 && incomingList.size() >= maxQueueSize) {
+ if (!queueFullLogged) {
+ log.warn("Internal queue is full ({} >= {}), failing
records to apply back-pressure",
+ incomingList.size(), maxQueueSize);
+ queueFullLogged = true;
+ }
+ record.fail();
+ return;
+ }
incomingList.add(record);
number = incomingList.size();
}
Review Comment:
write() calls record.fail() while holding the incomingList monitor. Record
callbacks can execute user/framework code and potentially block, which can
stall concurrent write()/flush() and increase back-pressure unexpectedly.
Consider capturing a boolean "shouldFail" inside the synchronized block and
invoking record.fail() after releasing the lock.
```suggestion
boolean shouldFail = false;
boolean shouldLogQueueFull = false;
int queueSizeAtFailure = -1;
synchronized (incomingList) {
if (maxQueueSize > 0 && incomingList.size() >= maxQueueSize) {
if (!queueFullLogged) {
queueFullLogged = true;
shouldLogQueueFull = true;
queueSizeAtFailure = incomingList.size();
}
shouldFail = true;
} else {
incomingList.add(record);
number = incomingList.size();
}
}
if (shouldFail) {
if (shouldLogQueueFull) {
log.warn("Internal queue is full ({} >= {}), failing records
to apply back-pressure",
queueSizeAtFailure, maxQueueSize);
}
record.fail();
return;
}
```
##########
jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java:
##########
@@ -130,6 +130,18 @@ public class JdbcSinkConfig implements Serializable {
)
private NullValueAction nullValueAction = NullValueAction.FAIL;
+ @FieldDoc(
+ required = false,
+ defaultValue = "0",
+ help = "Maximum number of records to buffer in the internal queue
before applying back-pressure. "
+ + "When the queue is full, incoming records will be failed
(negatively acknowledged) so that "
+ + "the Pulsar consumer can redeliver them later. This
prevents out-of-memory errors when the "
+ + "database connection is slow or broken. "
+ + "A value of 0 (default) auto-sizes to batchSize * 10. "
+ + "A value of -1 disables the limit (unbounded, legacy
behavior)."
+ )
+ private int maxQueueSize = 0;
Review Comment:
The FieldDoc says "-1 disables the limit", but the implementation treats any
negative value as unbounded (since only `maxQueueSize > 0` is enforced). To
avoid surprising behavior from typos (e.g., -2), consider validating
maxQueueSize in JdbcSinkConfig.validate() to allow only -1, 0, or a positive
value (and fail fast otherwise).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]