Copilot commented on code in PR #9:
URL: https://github.com/apache/pulsar-connectors/pull/9#discussion_r3019608721


##########
jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java:
##########
@@ -239,6 +270,9 @@ protected enum MutationType {
 
 
     private void flush() {
+        if (state.get() == State.CLOSED) {
+            return;
+        }
         if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) {

Review Comment:
   flush() reads incomingList.size() outside of synchronized(incomingList) 
(line 276, and the debug log at 371). Since LinkedList is not thread-safe, this 
is a data race and can lead to flush being skipped even when items are present 
(or to inconsistent queue size observations). Move the emptiness/size check 
inside the existing synchronized block (or switch to a concurrent collection) 
so flush triggering is based on a consistent view.



##########
jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java:
##########
@@ -114,12 +119,20 @@ public void open(Map<String, Object> config, SinkContext 
sinkContext) throws Exc
 
         int timeoutMs = jdbcSinkConfig.getTimeoutMs();
         batchSize = jdbcSinkConfig.getBatchSize();
+        maxQueueSize = jdbcSinkConfig.getMaxQueueSize();
+        if (maxQueueSize == 0) {
+            // Auto-size: default to 10x batch size
+            maxQueueSize = batchSize > 0 ? batchSize * 10 : 10000;

Review Comment:
   maxQueueSize auto-sizing uses `batchSize * 10` on ints. For very large 
configured batchSize values this can overflow to a negative number and 
unintentionally disable the limit (and reintroduce OOM risk). Use a safe 
multiply (e.g., long with cap at Integer.MAX_VALUE, or Math.multiplyExact with 
fallback) before assigning to maxQueueSize.
   ```suggestion
               // Auto-size: default to 10x batch size (with overflow-safe 
calculation)
               long calculated = batchSize > 0 ? (long) batchSize * 10L : 
10000L;
               if (calculated > Integer.MAX_VALUE) {
                   maxQueueSize = Integer.MAX_VALUE;
               } else {
                   maxQueueSize = (int) calculated;
               }
   ```



##########
jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java:
##########
@@ -130,6 +130,18 @@ public class JdbcSinkConfig implements Serializable {
     )
     private NullValueAction nullValueAction = NullValueAction.FAIL;
 
+    @FieldDoc(
+            required = false,
+            defaultValue = "0",
+            help = "Maximum number of records to buffer in the internal queue 
before applying back-pressure. "
+                    + "When the queue is full, incoming records will be failed 
(negatively acknowledged) so that "
+                    + "the Pulsar consumer can redeliver them later. This 
prevents out-of-memory errors when the "
+                    + "database connection is slow or broken. "
+                    + "A value of 0 (default) auto-sizes to batchSize * 10. "
+                    + "A value of -1 disables the limit (unbounded, legacy 
behavior)."
+    )
+    private int maxQueueSize = 0;

Review Comment:
   This PR introduces a new default behavior: previously the in-memory queue 
was effectively unbounded; now maxQueueSize defaults to 0 and is interpreted as 
a bounded auto-size (batchSize * 10). This can cause records to be 
failed/redelivered under sustained backlog where previously they would queue. 
If the intent is to preserve legacy behavior by default, consider defaulting to 
-1 or treating 0 as unbounded; otherwise, please ensure the PR 
description/release notes reflect the behavioral change (the PR description 
currently states that `maxQueueSize=0` preserves legacy unbounded behavior).
   ```suggestion
               defaultValue = "-1",
               help = "Maximum number of records to buffer in the internal 
queue before applying back-pressure. "
                       + "When the queue is full, incoming records will be 
failed (negatively acknowledged) so that "
                       + "the Pulsar consumer can redeliver them later. This 
prevents out-of-memory errors when the "
                       + "database connection is slow or broken. "
                       + "A value of 0 auto-sizes to batchSize * 10. "
                       + "A value of -1 (default) disables the limit 
(unbounded, legacy behavior)."
       )
       private int maxQueueSize = -1;
   ```



##########
jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java:
##########
@@ -188,8 +205,22 @@ public void close() throws Exception {
 
     @Override
     public void write(Record<T> record) throws Exception {
+        if (state.get() != State.OPEN) {
+            log.warn("Sink is not in OPEN state (current: {}), failing 
record", state.get());
+            record.fail();
+            return;
+        }
         int number;
         synchronized (incomingList) {
+            if (maxQueueSize > 0 && incomingList.size() >= maxQueueSize) {
+                if (!queueFullLogged) {
+                    log.warn("Internal queue is full ({} >= {}), failing 
records to apply back-pressure",
+                            incomingList.size(), maxQueueSize);
+                    queueFullLogged = true;
+                }
+                record.fail();
+                return;
+            }
             incomingList.add(record);
             number = incomingList.size();
         }

Review Comment:
   write() calls record.fail() while holding the incomingList monitor. Record 
callbacks can execute user/framework code and potentially block, which can 
stall concurrent write()/flush() and increase back-pressure unexpectedly. 
Consider capturing a boolean "shouldFail" inside the synchronized block and 
invoking record.fail() after releasing the lock.
   ```suggestion
           boolean shouldFail = false;
           boolean shouldLogQueueFull = false;
           int queueSizeAtFailure = -1;
           synchronized (incomingList) {
               if (maxQueueSize > 0 && incomingList.size() >= maxQueueSize) {
                   if (!queueFullLogged) {
                       queueFullLogged = true;
                       shouldLogQueueFull = true;
                       queueSizeAtFailure = incomingList.size();
                   }
                   shouldFail = true;
               } else {
                   incomingList.add(record);
                   number = incomingList.size();
               }
           }
           if (shouldFail) {
               if (shouldLogQueueFull) {
                   log.warn("Internal queue is full ({} >= {}), failing records 
to apply back-pressure",
                           queueSizeAtFailure, maxQueueSize);
               }
               record.fail();
               return;
           }
   ```



##########
jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java:
##########
@@ -130,6 +130,18 @@ public class JdbcSinkConfig implements Serializable {
     )
     private NullValueAction nullValueAction = NullValueAction.FAIL;
 
+    @FieldDoc(
+            required = false,
+            defaultValue = "0",
+            help = "Maximum number of records to buffer in the internal queue 
before applying back-pressure. "
+                    + "When the queue is full, incoming records will be failed 
(negatively acknowledged) so that "
+                    + "the Pulsar consumer can redeliver them later. This 
prevents out-of-memory errors when the "
+                    + "database connection is slow or broken. "
+                    + "A value of 0 (default) auto-sizes to batchSize * 10. "
+                    + "A value of -1 disables the limit (unbounded, legacy 
behavior)."
+    )
+    private int maxQueueSize = 0;

Review Comment:
   The FieldDoc says "-1 disables the limit", but the implementation treats any 
negative value as unbounded (since only `maxQueueSize > 0` is enforced). To 
avoid surprising behavior from typos (e.g., -2), consider validating 
maxQueueSize in JdbcSinkConfig.validate() to allow only -1, 0, or a positive 
value (and fail fast otherwise).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to