This is an automated email from the ASF dual-hosted git repository.
davekj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-connectors.git
The following commit(s) were added to refs/heads/master by this push:
new e689bc3 [fix][io] JDBC sink: prevent OOM from unbounded queue on
connection failure (#9)
e689bc3 is described below
commit e689bc345bad14ff176a4a35998fd21111f26766
Author: Harangozó Péter <[email protected]>
AuthorDate: Fri Apr 3 16:29:14 2026 +0200
[fix][io] JDBC sink: prevent OOM from unbounded queue on connection failure
(#9)
* [fix][io] JDBC sink: prevent OOM from unbounded queue on connection
failure
The JDBC sink's internal queue (incomingList) is unbounded. When the
database connection drops, executeBatch() hangs until the TCP socket
times out. During this period, isFlushing stays true, preventing any
draining, while write() continues accepting records without limit.
This causes OutOfMemoryError in production.
This commit fixes 4 issues:
1. Bounded internal queue: write() now rejects records when queue
exceeds maxQueueSize (configurable, defaults to 10x batchSize),
applying Pulsar-level back-pressure via negative acknowledgment.
2. State check in write(): records are failed immediately when the
sink state is not OPEN (after fatal() or close()).
3. Connection validation and reconnection: ensureConnection() validates
the JDBC connection before each flush and reconnects automatically
on failure, allowing recovery from transient database outages.
4. Scheduled flush cancellation: fatal() and close() now cancel the
periodic flush task to prevent repeated failures on a broken
connection.
Fixes https://github.com/apache/pulsar/issues/25030
* [fix][connector] Address review: thread-safety, default, validation
- Move record.fail() outside synchronized(incomingList) to avoid
holding the lock during framework callbacks
- Move incomingList.size() check inside synchronized in flush() to
fix data race on non-thread-safe LinkedList
- Change maxQueueSize default from 0 (auto-bounded) to -1 (unbounded)
to preserve backwards-compatible legacy behavior; users opt-in to
bounded queue by setting maxQueueSize=0 (auto) or a positive value
- Add overflow-safe auto-sizing (long arithmetic capped at MAX_VALUE)
- Validate maxQueueSize in JdbcSinkConfig.validate() — reject < -1
- Add test for invalid maxQueueSize rejection
---
.../apache/pulsar/io/jdbc/JdbcAbstractSink.java | 139 ++++++++++++++---
.../org/apache/pulsar/io/jdbc/JdbcSinkConfig.java | 16 ++
.../apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java | 164 +++++++++++++++++++++
3 files changed, 295 insertions(+), 24 deletions(-)
diff --git
a/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
b/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
index 73ba6b7..3f2477a 100644
--- a/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
+++ b/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -33,6 +33,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -78,8 +79,12 @@ public abstract class JdbcAbstractSink<T> implements Sink<T>
{
private Deque<Record<T>> incomingList;
private AtomicBoolean isFlushing;
private int batchSize;
+ private int maxQueueSize;
private ScheduledExecutorService flushExecutor;
+ private ScheduledFuture<?> scheduledFlushTask;
private SinkContext sinkContext;
+ private Properties connectionProperties;
+ private volatile boolean queueFullLogged = false;
private final AtomicReference<State> state = new
AtomicReference<>(State.OPEN);
@Override
@@ -93,17 +98,17 @@ public abstract class JdbcAbstractSink<T> implements
Sink<T> {
throw new IllegalArgumentException("Required jdbc Url not set.");
}
- Properties properties = new Properties();
+ connectionProperties = new Properties();
String username = jdbcSinkConfig.getUserName();
String password = jdbcSinkConfig.getPassword();
if (username != null) {
- properties.setProperty("user", username);
+ connectionProperties.setProperty("user", username);
}
if (password != null) {
- properties.setProperty("password", password);
+ connectionProperties.setProperty("password", password);
}
- connection = DriverManager.getConnection(jdbcSinkConfig.getJdbcUrl(),
properties);
+ connection = DriverManager.getConnection(jdbcSinkConfig.getJdbcUrl(),
connectionProperties);
connection.setAutoCommit(!jdbcSinkConfig.isUseTransactions());
log.info("Opened jdbc connection: {}, autoCommit: {}", jdbcUrl,
connection.getAutoCommit());
@@ -114,12 +119,21 @@ public abstract class JdbcAbstractSink<T> implements
Sink<T> {
int timeoutMs = jdbcSinkConfig.getTimeoutMs();
batchSize = jdbcSinkConfig.getBatchSize();
+ maxQueueSize = jdbcSinkConfig.getMaxQueueSize();
+ if (maxQueueSize == 0) {
+ // Auto-size: default to 10x batch size (overflow-safe)
+ long calculated = batchSize > 0 ? (long) batchSize * 10L : 10000L;
+ maxQueueSize = calculated > Integer.MAX_VALUE ? Integer.MAX_VALUE
: (int) calculated;
+ }
+ // maxQueueSize < 0 (i.e. -1) means unbounded (legacy behavior)
+ log.info("JDBC sink queue capacity: {}", maxQueueSize > 0 ?
maxQueueSize : "unbounded");
incomingList = new LinkedList<>();
isFlushing = new AtomicBoolean(false);
flushExecutor = Executors.newScheduledThreadPool(1);
if (timeoutMs > 0) {
- flushExecutor.scheduleAtFixedRate(this::flush, timeoutMs,
timeoutMs, TimeUnit.MILLISECONDS);
+ scheduledFlushTask = flushExecutor.scheduleAtFixedRate(
+ this::flush, timeoutMs, timeoutMs, TimeUnit.MILLISECONDS);
}
}
@@ -158,6 +172,10 @@ public abstract class JdbcAbstractSink<T> implements
Sink<T> {
@Override
public void close() throws Exception {
state.set(State.CLOSED);
+ if (scheduledFlushTask != null) {
+ scheduledFlushTask.cancel(false);
+ scheduledFlushTask = null;
+ }
if (flushExecutor != null) {
int timeoutMs = jdbcSinkConfig.getTimeoutMs() * 2;
flushExecutor.shutdown();
@@ -188,10 +206,35 @@ public abstract class JdbcAbstractSink<T> implements
Sink<T> {
@Override
public void write(Record<T> record) throws Exception {
- int number;
+ if (state.get() != State.OPEN) {
+ log.warn("Sink is not in OPEN state (current: {}), failing
record", state.get());
+ record.fail();
+ return;
+ }
+ int number = 0;
+ boolean shouldFail = false;
+ boolean shouldLogQueueFull = false;
+ int queueSizeSnapshot = 0;
synchronized (incomingList) {
- incomingList.add(record);
- number = incomingList.size();
+ if (maxQueueSize > 0 && incomingList.size() >= maxQueueSize) {
+ if (!queueFullLogged) {
+ queueFullLogged = true;
+ shouldLogQueueFull = true;
+ queueSizeSnapshot = incomingList.size();
+ }
+ shouldFail = true;
+ } else {
+ incomingList.add(record);
+ number = incomingList.size();
+ }
+ }
+ if (shouldFail) {
+ if (shouldLogQueueFull) {
+ log.warn("Internal queue is full ({} >= {}), failing records
to apply back-pressure",
+ queueSizeSnapshot, maxQueueSize);
+ }
+ record.fail();
+ return;
}
if (batchSize > 0 && number >= batchSize) {
if (log.isDebugEnabled()) {
@@ -239,26 +282,42 @@ public abstract class JdbcAbstractSink<T> implements
Sink<T> {
private void flush() {
- if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) {
- boolean needAnotherRound;
- final Deque<Record<T>> swapList = new LinkedList<>();
-
- synchronized (incomingList) {
- if (log.isDebugEnabled()) {
- log.debug("Starting flush, queue size: {}",
incomingList.size());
+ if (state.get() == State.CLOSED) {
+ return;
+ }
+ if (!isFlushing.compareAndSet(false, true)) {
+ if (log.isDebugEnabled()) {
+ synchronized (incomingList) {
+ log.debug("Already in flushing state, will not flush,
queue size: {}", incomingList.size());
}
- final int actualBatchSize = batchSize > 0 ?
Math.min(incomingList.size(), batchSize) :
- incomingList.size();
+ }
+ return;
+ }
+ boolean needAnotherRound;
+ final Deque<Record<T>> swapList = new LinkedList<>();
- for (int i = 0; i < actualBatchSize; i++) {
- swapList.add(incomingList.removeFirst());
- }
- needAnotherRound = batchSize > 0 && !incomingList.isEmpty() &&
incomingList.size() >= batchSize;
+ synchronized (incomingList) {
+ if (incomingList.isEmpty()) {
+ isFlushing.set(false);
+ return;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Starting flush, queue size: {}",
incomingList.size());
+ }
+ final int actualBatchSize = batchSize > 0 ?
Math.min(incomingList.size(), batchSize) :
+ incomingList.size();
+
+ for (int i = 0; i < actualBatchSize; i++) {
+ swapList.add(incomingList.removeFirst());
}
+ needAnotherRound = batchSize > 0 && !incomingList.isEmpty() &&
incomingList.size() >= batchSize;
+ }
long start = System.nanoTime();
int count = 0;
try {
+ ensureConnection();
+
PreparedStatement currentBatch = null;
final List<Mutation> mutations = swapList
.stream()
@@ -308,6 +367,7 @@ public abstract class JdbcAbstractSink<T> implements
Sink<T> {
} else {
internalFlush(swapList);
}
+ queueFullLogged = false;
} catch (Exception e) {
log.error("Got exception {} after {} ms, failing {} messages",
e.getMessage(),
@@ -329,10 +389,37 @@ public abstract class JdbcAbstractSink<T> implements
Sink<T> {
if (needAnotherRound) {
flush();
}
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Already in flushing state, will not flush, queue
size: {}", incomingList.size());
+ }
+
+ private void ensureConnection() throws Exception {
+ try {
+ if (connection != null && connection.isValid(2)) {
+ return;
+ }
+ } catch (SQLException e) {
+ log.warn("Connection validation failed: {}", e.getMessage());
+ }
+
+ log.info("JDBC connection is invalid, attempting to reconnect to: {}",
jdbcUrl);
+ closeConnectionQuietly();
+
+ connection = DriverManager.getConnection(jdbcSinkConfig.getJdbcUrl(),
connectionProperties);
+ connection.setAutoCommit(!jdbcSinkConfig.isUseTransactions());
+
+ tableId = JdbcUtils.getTableId(connection, tableName);
+ initStatement();
+
+ log.info("Successfully reconnected to: {}", jdbcUrl);
+ }
+
+ private void closeConnectionQuietly() {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (Exception e) {
+ log.debug("Error closing stale connection", e);
}
+ connection = null;
}
}
@@ -404,6 +491,10 @@ public abstract class JdbcAbstractSink<T> implements
Sink<T> {
*/
private void fatal(Exception e) {
if (sinkContext != null && state.compareAndSet(State.OPEN,
State.FAILED)) {
+ log.error("Fatal error in JDBC sink, signaling framework for
shutdown", e);
+ if (scheduledFlushTask != null) {
+ scheduledFlushTask.cancel(false);
+ }
sinkContext.fatal(e);
}
}
diff --git
a/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
b/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
index 854d683..a59067b 100644
--- a/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
+++ b/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
@@ -130,6 +130,18 @@ public class JdbcSinkConfig implements Serializable {
)
private NullValueAction nullValueAction = NullValueAction.FAIL;
+ @FieldDoc(
+ required = false,
+ defaultValue = "-1",
+ help = "Maximum number of records to buffer in the internal queue
before applying back-pressure. "
+ + "When the queue is full, incoming records will be failed
(negatively acknowledged) so that "
+ + "the Pulsar consumer can redeliver them later. This
prevents out-of-memory errors when the "
+ + "database connection is slow or broken. "
+ + "A value of 0 auto-sizes to batchSize * 10. "
+ + "A value of -1 (default) disables the limit (unbounded,
legacy behavior)."
+ )
+ private int maxQueueSize = -1;
+
public enum InsertMode {
INSERT,
UPSERT,
@@ -155,6 +167,10 @@ public class JdbcSinkConfig implements Serializable {
if (timeoutMs <= 0 && batchSize <= 0) {
throw new IllegalArgumentException("timeoutMs or batchSize must be
set to a positive value.");
}
+ if (maxQueueSize < -1) {
+ throw new IllegalArgumentException("maxQueueSize must be -1
(unbounded), 0 (auto-size), "
+ + "or a positive value.");
+ }
}
}
diff --git
a/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
b/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
index 7cab33d..c76436a 100644
---
a/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
+++
b/jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java
@@ -932,6 +932,170 @@ public class SqliteJdbcSinkTest {
}
}
+ /**
+ * Test that write() rejects records when the sink is in FAILED state.
+ * After fatal() is called, records should be failed immediately instead
of queuing.
+ */
+ @Test
+ public void testWriteRejectsRecordsAfterFatal() throws Exception {
+ jdbcSink.close();
+ jdbcSink = null;
+
+ String jdbcUrl = sqliteUtils.sqliteUri();
+ Map<String, Object> conf = Maps.newHashMap();
+ conf.put("jdbcUrl", jdbcUrl);
+ conf.put("tableName", tableName);
+ conf.put("key", "field3");
+ conf.put("nonKey", "field1,field2");
+ conf.put("batchSize", 1);
+
+ SinkContext mockSinkContext = mock(SinkContext.class);
+ SqliteJdbcAutoSchemaSink sinkWithContext = new
SqliteJdbcAutoSchemaSink();
+ try {
+ sinkWithContext.open(conf, mockSinkContext);
+
+ // Force a fatal error by replacing insertStatement with a mock
that throws
+ PreparedStatement mockStatement = mock(PreparedStatement.class);
+ doThrow(new SQLException("Connection
lost")).when(mockStatement).execute();
+ FieldUtils.writeField(sinkWithContext, "insertStatement",
mockStatement, true);
+
+ // Write first record to trigger fatal
+ Foo obj1 = new Foo("f1", "f2", 10);
+ CompletableFuture<Boolean> future1 = new CompletableFuture<>();
+ sinkWithContext.write(createMockFooRecord(obj1, Maps.newHashMap(),
future1));
+
+ // Wait for fatal to be called
+ Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() ->
+ verify(mockSinkContext).fatal(any(Throwable.class)));
+
+ // Now write another record — it should be failed immediately
+ Foo obj2 = new Foo("f3", "f4", 11);
+ CompletableFuture<Boolean> future2 = new CompletableFuture<>();
+ sinkWithContext.write(createMockFooRecord(obj2, Maps.newHashMap(),
future2));
+
+ // Record should be failed (false), not acked (true)
+ Assert.assertFalse(future2.get(1, TimeUnit.SECONDS));
+ } finally {
+ sinkWithContext.close();
+ }
+ }
+
+ /**
+ * Test that the bounded queue applies back-pressure by failing records
when full.
+ */
+ @Test
+ public void testBoundedQueueBackPressure() throws Exception {
+ jdbcSink.close();
+ jdbcSink = null;
+
+ String jdbcUrl = sqliteUtils.sqliteUri();
+ Map<String, Object> conf = Maps.newHashMap();
+ conf.put("jdbcUrl", jdbcUrl);
+ conf.put("tableName", tableName);
+ conf.put("key", "field3");
+ conf.put("nonKey", "field1,field2");
+ // Large batch size so flush is not triggered by writes
+ conf.put("batchSize", 1000);
+ // No time-based flush
+ conf.put("timeoutMs", 0);
+ // Small queue to test back-pressure
+ conf.put("maxQueueSize", 5);
+
+ SqliteJdbcAutoSchemaSink boundedSink = new SqliteJdbcAutoSchemaSink();
+ try {
+ boundedSink.open(conf, null);
+
+ // Fill the queue to capacity
+ List<CompletableFuture<Boolean>> futures = new
java.util.ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ CompletableFuture<Boolean> f = new CompletableFuture<>();
+ futures.add(f);
+ boundedSink.write(createMockFooRecord(new Foo("f1", "f2", i +
100), Maps.newHashMap(), f));
+ }
+
+ // Next write should be rejected due to queue full
+ CompletableFuture<Boolean> overflowFuture = new
CompletableFuture<>();
+ boundedSink.write(createMockFooRecord(new Foo("overflow", "val",
999), Maps.newHashMap(), overflowFuture));
+
+ // The overflow record should be failed immediately
+ Assert.assertFalse(overflowFuture.get(1, TimeUnit.SECONDS));
+ } finally {
+ boundedSink.close();
+ }
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = ".*maxQueueSize.*")
+ public void testInvalidMaxQueueSizeRejected() throws Exception {
+ jdbcSink.close();
+ jdbcSink = null;
+
+ Map<String, Object> conf = Maps.newHashMap();
+ conf.put("jdbcUrl", sqliteUtils.sqliteUri());
+ conf.put("tableName", tableName);
+ conf.put("key", "field3");
+ conf.put("nonKey", "field1,field2");
+ conf.put("batchSize", 200);
+ conf.put("timeoutMs", 500);
+ conf.put("maxQueueSize", -2);
+
+ SqliteJdbcAutoSchemaSink sink = new SqliteJdbcAutoSchemaSink();
+ sink.open(conf, null);
+ }
+
+ /**
+ * Test that ensureConnection() reconnects when the existing connection
becomes invalid.
+ * Simulates a database going away and coming back by closing the
connection mid-flight,
+ * then verifying the sink recovers and processes the next batch
successfully.
+ */
+ @Test
+ public void testReconnectOnBrokenConnection() throws Exception {
+ jdbcSink.close();
+ jdbcSink = null;
+
+ String jdbcUrl = sqliteUtils.sqliteUri();
+ Map<String, Object> conf = Maps.newHashMap();
+ conf.put("jdbcUrl", jdbcUrl);
+ conf.put("tableName", tableName);
+ conf.put("key", "field3");
+ conf.put("nonKey", "field1,field2");
+ conf.put("batchSize", 1);
+
+ SqliteJdbcAutoSchemaSink reconnectSink = new
SqliteJdbcAutoSchemaSink();
+ try {
+ reconnectSink.open(conf, null);
+
+ // First write should succeed — connection is healthy
+ Foo obj1 = new Foo("reconnect1", "val1", 50);
+ CompletableFuture<Boolean> future1 = new CompletableFuture<>();
+ reconnectSink.write(createMockFooRecord(obj1, Maps.newHashMap(),
future1));
+ Assert.assertTrue(future1.get(5, TimeUnit.SECONDS));
+
+ // Verify record was persisted
+ int count = sqliteUtils.select("SELECT * FROM " + tableName + "
WHERE field3=50", (rs) -> {
+ Assert.assertEquals(rs.getString(1), "reconnect1");
+ });
+ Assert.assertEquals(count, 1);
+
+ // Now break the connection by closing it behind the sink's back
+ reconnectSink.getConnection().close();
+
+ // Next write should trigger ensureConnection() → reconnect →
succeed
+ Foo obj2 = new Foo("reconnect2", "val2", 51);
+ CompletableFuture<Boolean> future2 = new CompletableFuture<>();
+ reconnectSink.write(createMockFooRecord(obj2, Maps.newHashMap(),
future2));
+ Assert.assertTrue(future2.get(5, TimeUnit.SECONDS));
+
+ // Verify second record was also persisted after reconnect
+ count = sqliteUtils.select("SELECT * FROM " + tableName + " WHERE
field3=51", (rs) -> {
+ Assert.assertEquals(rs.getString(1), "reconnect2");
+ });
+ Assert.assertEquals(count, 1);
+ } finally {
+ reconnectSink.close();
+ }
+ }
+
@SuppressWarnings("unchecked")
private Record<GenericObject> createMockFooRecord(Foo record, Map<String,
String> actionProperties,
CompletableFuture<Boolean> future) {