This is an automated email from the ASF dual-hosted git repository.

davekj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-connectors.git


The following commit(s) were added to refs/heads/master by this push:
     new e689bc3  [fix][io] JDBC sink: prevent OOM from unbounded queue on 
connection failure (#9)
e689bc3 is described below

commit e689bc345bad14ff176a4a35998fd21111f26766
Author: Harangozó Péter <[email protected]>
AuthorDate: Fri Apr 3 16:29:14 2026 +0200

    [fix][io] JDBC sink: prevent OOM from unbounded queue on connection failure 
(#9)
    
    * [fix][io] JDBC sink: prevent OOM from unbounded queue on connection 
failure
    
    The JDBC sink's internal queue (incomingList) is unbounded. When the
    database connection drops, executeBatch() hangs until the TCP socket
    times out. During this period, isFlushing stays true, preventing any
    draining, while write() continues accepting records without limit.
    This causes OutOfMemoryError in production.
    
    This commit fixes 4 issues:
    
    1. Bounded internal queue: write() now rejects records when queue
       exceeds maxQueueSize (configurable, defaults to 10x batchSize),
       applying Pulsar-level back-pressure via negative acknowledgment.
    
    2. State check in write(): records are failed immediately when the
       sink state is not OPEN (after fatal() or close()).
    
    3. Connection validation and reconnection: ensureConnection() validates
       the JDBC connection before each flush and reconnects automatically
       on failure, allowing recovery from transient database outages.
    
    4. Scheduled flush cancellation: fatal() and close() now cancel the
       periodic flush task to prevent repeated failures on a broken
       connection.
    
    Fixes https://github.com/apache/pulsar/issues/25030
    
    * [fix][connector] Address review: thread-safety, default, validation
    
    - Move record.fail() outside synchronized(incomingList) to avoid
      holding the lock during framework callbacks
    - Move incomingList.size() check inside synchronized in flush() to
      fix data race on non-thread-safe LinkedList
    - Change maxQueueSize default from 0 (auto-bounded) to -1 (unbounded)
      to preserve backwards-compatible legacy behavior; users opt-in to
      bounded queue by setting maxQueueSize=0 (auto) or a positive value
    - Add overflow-safe auto-sizing (long arithmetic capped at MAX_VALUE)
    - Validate maxQueueSize in JdbcSinkConfig.validate() — reject < -1
    - Add test for invalid maxQueueSize rejection
---
 .../apache/pulsar/io/jdbc/JdbcAbstractSink.java    | 139 ++++++++++++++---
 .../org/apache/pulsar/io/jdbc/JdbcSinkConfig.java  |  16 ++
 .../apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java  | 164 +++++++++++++++++++++
 3 files changed, 295 insertions(+), 24 deletions(-)

diff --git 
a/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java 
b/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
index 73ba6b7..3f2477a 100644
--- a/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
+++ b/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -78,8 +79,12 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> 
{
     private Deque<Record<T>> incomingList;
     private AtomicBoolean isFlushing;
     private int batchSize;
+    private int maxQueueSize;
     private ScheduledExecutorService flushExecutor;
+    private ScheduledFuture<?> scheduledFlushTask;
     private SinkContext sinkContext;
+    private Properties connectionProperties;
+    private volatile boolean queueFullLogged = false;
     private final AtomicReference<State> state = new 
AtomicReference<>(State.OPEN);
 
     @Override
@@ -93,17 +98,17 @@ public abstract class JdbcAbstractSink<T> implements 
Sink<T> {
             throw new IllegalArgumentException("Required jdbc Url not set.");
         }
 
-        Properties properties = new Properties();
+        connectionProperties = new Properties();
         String username = jdbcSinkConfig.getUserName();
         String password = jdbcSinkConfig.getPassword();
         if (username != null) {
-            properties.setProperty("user", username);
+            connectionProperties.setProperty("user", username);
         }
         if (password != null) {
-            properties.setProperty("password", password);
+            connectionProperties.setProperty("password", password);
         }
 
-        connection = DriverManager.getConnection(jdbcSinkConfig.getJdbcUrl(), 
properties);
+        connection = DriverManager.getConnection(jdbcSinkConfig.getJdbcUrl(), 
connectionProperties);
         connection.setAutoCommit(!jdbcSinkConfig.isUseTransactions());
         log.info("Opened jdbc connection: {}, autoCommit: {}", jdbcUrl, 
connection.getAutoCommit());
 
@@ -114,12 +119,21 @@ public abstract class JdbcAbstractSink<T> implements 
Sink<T> {
 
         int timeoutMs = jdbcSinkConfig.getTimeoutMs();
         batchSize = jdbcSinkConfig.getBatchSize();
+        maxQueueSize = jdbcSinkConfig.getMaxQueueSize();
+        if (maxQueueSize == 0) {
+            // Auto-size: default to 10x batch size (overflow-safe)
+            long calculated = batchSize > 0 ? (long) batchSize * 10L : 10000L;
+            maxQueueSize = calculated > Integer.MAX_VALUE ? Integer.MAX_VALUE 
: (int) calculated;
+        }
+        // maxQueueSize < 0 (i.e. -1) means unbounded (legacy behavior)
+        log.info("JDBC sink queue capacity: {}", maxQueueSize > 0 ? 
maxQueueSize : "unbounded");
         incomingList = new LinkedList<>();
         isFlushing = new AtomicBoolean(false);
 
         flushExecutor = Executors.newScheduledThreadPool(1);
         if (timeoutMs > 0) {
-            flushExecutor.scheduleAtFixedRate(this::flush, timeoutMs, 
timeoutMs, TimeUnit.MILLISECONDS);
+            scheduledFlushTask = flushExecutor.scheduleAtFixedRate(
+                    this::flush, timeoutMs, timeoutMs, TimeUnit.MILLISECONDS);
         }
     }
 
@@ -158,6 +172,10 @@ public abstract class JdbcAbstractSink<T> implements 
Sink<T> {
     @Override
     public void close() throws Exception {
         state.set(State.CLOSED);
+        if (scheduledFlushTask != null) {
+            scheduledFlushTask.cancel(false);
+            scheduledFlushTask = null;
+        }
         if (flushExecutor != null) {
             int timeoutMs = jdbcSinkConfig.getTimeoutMs() * 2;
             flushExecutor.shutdown();
@@ -188,10 +206,35 @@ public abstract class JdbcAbstractSink<T> implements 
Sink<T> {
 
     @Override
     public void write(Record<T> record) throws Exception {
-        int number;
+        if (state.get() != State.OPEN) {
+            log.warn("Sink is not in OPEN state (current: {}), failing 
record", state.get());
+            record.fail();
+            return;
+        }
+        int number = 0;
+        boolean shouldFail = false;
+        boolean shouldLogQueueFull = false;
+        int queueSizeSnapshot = 0;
         synchronized (incomingList) {
-            incomingList.add(record);
-            number = incomingList.size();
+            if (maxQueueSize > 0 && incomingList.size() >= maxQueueSize) {
+                if (!queueFullLogged) {
+                    queueFullLogged = true;
+                    shouldLogQueueFull = true;
+                    queueSizeSnapshot = incomingList.size();
+                }
+                shouldFail = true;
+            } else {
+                incomingList.add(record);
+                number = incomingList.size();
+            }
+        }
+        if (shouldFail) {
+            if (shouldLogQueueFull) {
+                log.warn("Internal queue is full ({} >= {}), failing records 
to apply back-pressure",
+                        queueSizeSnapshot, maxQueueSize);
+            }
+            record.fail();
+            return;
         }
         if (batchSize > 0 && number >= batchSize) {
             if (log.isDebugEnabled()) {
@@ -239,26 +282,42 @@ public abstract class JdbcAbstractSink<T> implements 
Sink<T> {
 
 
     private void flush() {
-        if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) {
-            boolean needAnotherRound;
-            final Deque<Record<T>> swapList = new LinkedList<>();
-
-            synchronized (incomingList) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Starting flush, queue size: {}", 
incomingList.size());
+        if (state.get() == State.CLOSED) {
+            return;
+        }
+        if (!isFlushing.compareAndSet(false, true)) {
+            if (log.isDebugEnabled()) {
+                synchronized (incomingList) {
+                    log.debug("Already in flushing state, will not flush, 
queue size: {}", incomingList.size());
                 }
-                final int actualBatchSize = batchSize > 0 ? 
Math.min(incomingList.size(), batchSize) :
-                        incomingList.size();
+            }
+            return;
+        }
+        boolean needAnotherRound;
+        final Deque<Record<T>> swapList = new LinkedList<>();
 
-                for (int i = 0; i < actualBatchSize; i++) {
-                    swapList.add(incomingList.removeFirst());
-                }
-                needAnotherRound = batchSize > 0 && !incomingList.isEmpty() && 
incomingList.size() >= batchSize;
+        synchronized (incomingList) {
+            if (incomingList.isEmpty()) {
+                isFlushing.set(false);
+                return;
+            }
+            if (log.isDebugEnabled()) {
+                log.debug("Starting flush, queue size: {}", 
incomingList.size());
+            }
+            final int actualBatchSize = batchSize > 0 ? 
Math.min(incomingList.size(), batchSize) :
+                    incomingList.size();
+
+            for (int i = 0; i < actualBatchSize; i++) {
+                swapList.add(incomingList.removeFirst());
             }
+            needAnotherRound = batchSize > 0 && !incomingList.isEmpty() && 
incomingList.size() >= batchSize;
+        }
             long start = System.nanoTime();
 
             int count = 0;
             try {
+                ensureConnection();
+
                 PreparedStatement currentBatch = null;
                 final List<Mutation> mutations = swapList
                         .stream()
@@ -308,6 +367,7 @@ public abstract class JdbcAbstractSink<T> implements 
Sink<T> {
                 } else {
                     internalFlush(swapList);
                 }
+                queueFullLogged = false;
             } catch (Exception e) {
                 log.error("Got exception {} after {} ms, failing {} messages",
                         e.getMessage(),
@@ -329,10 +389,37 @@ public abstract class JdbcAbstractSink<T> implements 
Sink<T> {
             if (needAnotherRound) {
                 flush();
             }
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug("Already in flushing state, will not flush, queue 
size: {}", incomingList.size());
+    }
+
+    private void ensureConnection() throws Exception {
+        try {
+            if (connection != null && connection.isValid(2)) {
+                return;
+            }
+        } catch (SQLException e) {
+            log.warn("Connection validation failed: {}", e.getMessage());
+        }
+
+        log.info("JDBC connection is invalid, attempting to reconnect to: {}", 
jdbcUrl);
+        closeConnectionQuietly();
+
+        connection = DriverManager.getConnection(jdbcSinkConfig.getJdbcUrl(), 
connectionProperties);
+        connection.setAutoCommit(!jdbcSinkConfig.isUseTransactions());
+
+        tableId = JdbcUtils.getTableId(connection, tableName);
+        initStatement();
+
+        log.info("Successfully reconnected to: {}", jdbcUrl);
+    }
+
+    private void closeConnectionQuietly() {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (Exception e) {
+                log.debug("Error closing stale connection", e);
             }
+            connection = null;
         }
     }
 
@@ -404,6 +491,10 @@ public abstract class JdbcAbstractSink<T> implements 
Sink<T> {
      */
     private void fatal(Exception e) {
         if (sinkContext != null && state.compareAndSet(State.OPEN, 
State.FAILED)) {
+            log.error("Fatal error in JDBC sink, signaling framework for 
shutdown", e);
+            if (scheduledFlushTask != null) {
+                scheduledFlushTask.cancel(false);
+            }
             sinkContext.fatal(e);
         }
     }
diff --git 
a/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java 
b/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
index 854d683..a59067b 100644
--- a/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
+++ b/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
@@ -130,6 +130,18 @@ public class JdbcSinkConfig implements Serializable {
     )
     private NullValueAction nullValueAction = NullValueAction.FAIL;
 
+    @FieldDoc(
+            required = false,
+            defaultValue = "-1",
+            help = "Maximum number of records to buffer in the internal queue 
before applying back-pressure. "
+                    + "When the queue is full, incoming records will be failed 
(negatively acknowledged) so that "
+                    + "the Pulsar consumer can redeliver them later. This 
prevents out-of-memory errors when the "
+                    + "database connection is slow or broken. "
+                    + "A value of 0 auto-sizes to batchSize * 10. "
+                    + "A value of -1 (default) disables the limit (unbounded, 
legacy behavior)."
+    )
+    private int maxQueueSize = -1;
+
     public enum InsertMode {
         INSERT,
         UPSERT,
@@ -155,6 +167,10 @@ public class JdbcSinkConfig implements Serializable {
         if (timeoutMs <= 0 && batchSize <= 0) {
             throw new IllegalArgumentException("timeoutMs or batchSize must be 
set to a positive value.");
         }
+        if (maxQueueSize < -1) {
+            throw new IllegalArgumentException("maxQueueSize must be -1 
(unbounded), 0 (auto-size), "
+                    + "or a positive value.");
+        }
     }
 
 }
diff --git 
a/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java 
b/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
index 7cab33d..c76436a 100644
--- 
a/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
+++ 
b/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
@@ -932,6 +932,170 @@ public class SqliteJdbcSinkTest {
         }
     }
 
+    /**
+     * Test that write() rejects records when the sink is in FAILED state.
+     * After fatal() is called, records should be failed immediately instead 
of queuing.
+     */
+    @Test
+    public void testWriteRejectsRecordsAfterFatal() throws Exception {
+        jdbcSink.close();
+        jdbcSink = null;
+
+        String jdbcUrl = sqliteUtils.sqliteUri();
+        Map<String, Object> conf = Maps.newHashMap();
+        conf.put("jdbcUrl", jdbcUrl);
+        conf.put("tableName", tableName);
+        conf.put("key", "field3");
+        conf.put("nonKey", "field1,field2");
+        conf.put("batchSize", 1);
+
+        SinkContext mockSinkContext = mock(SinkContext.class);
+        SqliteJdbcAutoSchemaSink sinkWithContext = new 
SqliteJdbcAutoSchemaSink();
+        try {
+            sinkWithContext.open(conf, mockSinkContext);
+
+            // Force a fatal error by replacing insertStatement with a mock 
that throws
+            PreparedStatement mockStatement = mock(PreparedStatement.class);
+            doThrow(new SQLException("Connection 
lost")).when(mockStatement).execute();
+            FieldUtils.writeField(sinkWithContext, "insertStatement", 
mockStatement, true);
+
+            // Write first record to trigger fatal
+            Foo obj1 = new Foo("f1", "f2", 10);
+            CompletableFuture<Boolean> future1 = new CompletableFuture<>();
+            sinkWithContext.write(createMockFooRecord(obj1, Maps.newHashMap(), 
future1));
+
+            // Wait for fatal to be called
+            Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() ->
+                    verify(mockSinkContext).fatal(any(Throwable.class)));
+
+            // Now write another record — it should be failed immediately
+            Foo obj2 = new Foo("f3", "f4", 11);
+            CompletableFuture<Boolean> future2 = new CompletableFuture<>();
+            sinkWithContext.write(createMockFooRecord(obj2, Maps.newHashMap(), 
future2));
+
+            // Record should be failed (false), not acked (true)
+            Assert.assertFalse(future2.get(1, TimeUnit.SECONDS));
+        } finally {
+            sinkWithContext.close();
+        }
+    }
+
+    /**
+     * Test that the bounded queue applies back-pressure by failing records 
when full.
+     */
+    @Test
+    public void testBoundedQueueBackPressure() throws Exception {
+        jdbcSink.close();
+        jdbcSink = null;
+
+        String jdbcUrl = sqliteUtils.sqliteUri();
+        Map<String, Object> conf = Maps.newHashMap();
+        conf.put("jdbcUrl", jdbcUrl);
+        conf.put("tableName", tableName);
+        conf.put("key", "field3");
+        conf.put("nonKey", "field1,field2");
+        // Large batch size so flush is not triggered by writes
+        conf.put("batchSize", 1000);
+        // No time-based flush
+        conf.put("timeoutMs", 0);
+        // Small queue to test back-pressure
+        conf.put("maxQueueSize", 5);
+
+        SqliteJdbcAutoSchemaSink boundedSink = new SqliteJdbcAutoSchemaSink();
+        try {
+            boundedSink.open(conf, null);
+
+            // Fill the queue to capacity
+            List<CompletableFuture<Boolean>> futures = new 
java.util.ArrayList<>();
+            for (int i = 0; i < 5; i++) {
+                CompletableFuture<Boolean> f = new CompletableFuture<>();
+                futures.add(f);
+                boundedSink.write(createMockFooRecord(new Foo("f1", "f2", i + 
100), Maps.newHashMap(), f));
+            }
+
+            // Next write should be rejected due to queue full
+            CompletableFuture<Boolean> overflowFuture = new 
CompletableFuture<>();
+            boundedSink.write(createMockFooRecord(new Foo("overflow", "val", 
999), Maps.newHashMap(), overflowFuture));
+
+            // The overflow record should be failed immediately
+            Assert.assertFalse(overflowFuture.get(1, TimeUnit.SECONDS));
+        } finally {
+            boundedSink.close();
+        }
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+          expectedExceptionsMessageRegExp = ".*maxQueueSize.*")
+    public void testInvalidMaxQueueSizeRejected() throws Exception {
+        jdbcSink.close();
+        jdbcSink = null;
+
+        Map<String, Object> conf = Maps.newHashMap();
+        conf.put("jdbcUrl", sqliteUtils.sqliteUri());
+        conf.put("tableName", tableName);
+        conf.put("key", "field3");
+        conf.put("nonKey", "field1,field2");
+        conf.put("batchSize", 200);
+        conf.put("timeoutMs", 500);
+        conf.put("maxQueueSize", -2);
+
+        SqliteJdbcAutoSchemaSink sink = new SqliteJdbcAutoSchemaSink();
+        sink.open(conf, null);
+    }
+
+    /**
+     * Test that ensureConnection() reconnects when the existing connection 
becomes invalid.
+     * Simulates a database going away and coming back by closing the 
connection mid-flight,
+     * then verifying the sink recovers and processes the next batch 
successfully.
+     */
+    @Test
+    public void testReconnectOnBrokenConnection() throws Exception {
+        jdbcSink.close();
+        jdbcSink = null;
+
+        String jdbcUrl = sqliteUtils.sqliteUri();
+        Map<String, Object> conf = Maps.newHashMap();
+        conf.put("jdbcUrl", jdbcUrl);
+        conf.put("tableName", tableName);
+        conf.put("key", "field3");
+        conf.put("nonKey", "field1,field2");
+        conf.put("batchSize", 1);
+
+        SqliteJdbcAutoSchemaSink reconnectSink = new 
SqliteJdbcAutoSchemaSink();
+        try {
+            reconnectSink.open(conf, null);
+
+            // First write should succeed — connection is healthy
+            Foo obj1 = new Foo("reconnect1", "val1", 50);
+            CompletableFuture<Boolean> future1 = new CompletableFuture<>();
+            reconnectSink.write(createMockFooRecord(obj1, Maps.newHashMap(), 
future1));
+            Assert.assertTrue(future1.get(5, TimeUnit.SECONDS));
+
+            // Verify record was persisted
+            int count = sqliteUtils.select("SELECT * FROM " + tableName + " 
WHERE field3=50", (rs) -> {
+                Assert.assertEquals(rs.getString(1), "reconnect1");
+            });
+            Assert.assertEquals(count, 1);
+
+            // Now break the connection by closing it behind the sink's back
+            reconnectSink.getConnection().close();
+
+            // Next write should trigger ensureConnection() → reconnect → 
succeed
+            Foo obj2 = new Foo("reconnect2", "val2", 51);
+            CompletableFuture<Boolean> future2 = new CompletableFuture<>();
+            reconnectSink.write(createMockFooRecord(obj2, Maps.newHashMap(), 
future2));
+            Assert.assertTrue(future2.get(5, TimeUnit.SECONDS));
+
+            // Verify second record was also persisted after reconnect
+            count = sqliteUtils.select("SELECT * FROM " + tableName + " WHERE 
field3=51", (rs) -> {
+                Assert.assertEquals(rs.getString(1), "reconnect2");
+            });
+            Assert.assertEquals(count, 1);
+        } finally {
+            reconnectSink.close();
+        }
+    }
+
     @SuppressWarnings("unchecked")
     private Record<GenericObject> createMockFooRecord(Foo record, Map<String, 
String> actionProperties,
                                                         
CompletableFuture<Boolean> future) {

Reply via email to