This is an automated email from the ASF dual-hosted git repository.

jkonisa pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-analytics.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dfface1b CASSANALYTICS-142 : SidecarCdcClient should be passed as a 
constructor parameter for SidecarCdc to avoid thread/resource leaks (#188)
dfface1b is described below

commit dfface1be7a0d70e745b6e0732e681bde237508c
Author: Jyothsna konisa <[email protected]>
AuthorDate: Tue Mar 31 14:27:41 2026 -0700

    CASSANALYTICS-142 : SidecarCdcClient should be passed as a constructor 
parameter for SidecarCdc to avoid thread/resource leaks (#188)
    
    Patch by Jyothsna Konisa; Reviewed by Yifan Cai for CASSANALYTICS-142
---
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/cdc/sidecar/SidecarCdc.java   | 33 ++++++++++----
 .../cassandra/cdc/sidecar/SidecarCdcBuilder.java   | 50 +---------------------
 .../cassandra/cdc/sidecar/SidecarCdcClient.java    | 43 +++++++++++++++++--
 .../cassandra/cdc/sidecar/SidecarCdcTest.java      | 17 ++------
 5 files changed, 69 insertions(+), 75 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 35afa6cd..ce89887d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.4.0
 -----
+ * Pass SidecarCdcClient as a constructor parameter to avoid thread/resource 
leaks (CASSANALYTICS-142)
  * Support extended deletion time in CDC for Cassandra 5.0
  * Flush event consumer before persisting CDC state to prevent data loss on 
failure (CASSANALYTICS-126)
  * Fix ReadStatusTracker to distinguish clean completion from error 
termination in BufferingCommitLogReader (CASSANALYTICS-129)
diff --git 
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdc.java
 
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdc.java
index e5ac33b6..f7e0513c 100644
--- 
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdc.java
+++ 
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdc.java
@@ -19,7 +19,6 @@
 
 package org.apache.cassandra.cdc.sidecar;
 
-import java.io.IOException;
 import java.util.Comparator;
 import java.util.Optional;
 import java.util.Set;
@@ -30,7 +29,6 @@ import org.apache.cassandra.cdc.api.EventConsumer;
 import org.apache.cassandra.cdc.api.SchemaSupplier;
 import org.apache.cassandra.cdc.api.TokenRangeSupplier;
 import org.apache.cassandra.cdc.stats.ICdcStats;
-import org.apache.cassandra.secrets.SecretsProvider;
 import org.apache.cassandra.spark.data.CqlTable;
 import org.apache.cassandra.spark.data.ReplicationFactor;
 import org.apache.cassandra.spark.utils.FutureUtils;
@@ -50,6 +48,27 @@ public class SidecarCdc extends Cdc
         initSchema();
     }
 
+    /**
+     * Creates a new {@link SidecarCdcBuilder} pre-configured with the 
supplied parameters.
+     *
+     * <p><b>Lifecycle of {@code sidecarCdcClient}:</b> the supplied {@link 
SidecarCdcClient} is treated as
+     * an externally managed singleton. Neither the returned builder nor the 
{@link SidecarCdc} instance it
+     * produces will close the client. The caller is solely responsible for 
closing the
+     * {@code SidecarCdcClient} (e.g. during application shutdown) to release 
underlying resources such as
+     * thread pools and HTTP connections.
+     *
+     * @param jobId                  unique identifier for the CDC job
+     * @param partitionId            partition index within the job
+     * @param cdcOptions             CDC processing options
+     * @param clusterConfigProvider  provider for cluster configuration (e.g. 
datacenter, hosts)
+     * @param eventConsumer          consumer that receives CDC change events
+     * @param schemaSupplier         supplier for CDC-enabled table schemas
+     * @param tokenRangeSupplier     supplier for the token ranges assigned to 
this partition
+     * @param sidecarCdcClient       externally managed Sidecar HTTP client; 
<em>not</em> closed by
+     *                               {@code SidecarCdc} or {@code 
SidecarCdcBuilder}
+     * @param cdcStats               CDC statistics collector
+     * @return a new {@link SidecarCdcBuilder}
+     */
     public static SidecarCdcBuilder builder(@NotNull String jobId,
                                             int partitionId,
                                             CdcOptions cdcOptions,
@@ -57,10 +76,8 @@ public class SidecarCdc extends Cdc
                                             EventConsumer eventConsumer,
                                             SchemaSupplier schemaSupplier,
                                             TokenRangeSupplier 
tokenRangeSupplier,
-                                            CdcSidecarInstancesProvider 
sidecarInstancesProvider,
-                                            SidecarCdcClient.ClientConfig 
clientConfig,
-                                            SecretsProvider secretsProvider,
-                                            ICdcStats cdcStats) throws 
IOException
+                                            SidecarCdcClient sidecarCdcClient,
+                                            ICdcStats cdcStats)
     {
         return new SidecarCdcBuilder(jobId,
                                      partitionId,
@@ -69,9 +86,7 @@ public class SidecarCdc extends Cdc
                                      eventConsumer,
                                      schemaSupplier,
                                      tokenRangeSupplier,
-                                     sidecarInstancesProvider,
-                                     clientConfig,
-                                     secretsProvider,
+                                     sidecarCdcClient,
                                      cdcStats);
     }
 
diff --git 
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java
 
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java
index ecc5fa93..4027db2d 100644
--- 
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java
+++ 
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcBuilder.java
@@ -19,22 +19,14 @@
 
 package org.apache.cassandra.cdc.sidecar;
 
-import java.io.IOException;
-import java.util.stream.Collectors;
-
 import com.google.common.base.Preconditions;
 
-import o.a.c.sidecar.client.shaded.client.SidecarInstanceImpl;
-import o.a.c.sidecar.client.shaded.client.SimpleSidecarInstancesProvider;
 import org.apache.cassandra.cdc.CdcBuilder;
 import org.apache.cassandra.cdc.api.CdcOptions;
 import org.apache.cassandra.cdc.api.EventConsumer;
 import org.apache.cassandra.cdc.api.SchemaSupplier;
 import org.apache.cassandra.cdc.api.TokenRangeSupplier;
 import org.apache.cassandra.cdc.stats.ICdcStats;
-import org.apache.cassandra.clients.Sidecar;
-import org.apache.cassandra.secrets.SecretsProvider;
-import o.a.c.sidecar.client.shaded.client.SidecarClient;
 import org.apache.cassandra.spark.utils.AsyncExecutor;
 import org.jetbrains.annotations.NotNull;
 
@@ -56,42 +48,12 @@ public class SidecarCdcBuilder extends CdcBuilder
                       EventConsumer eventConsumer,
                       SchemaSupplier schemaSupplier,
                       TokenRangeSupplier tokenRangeSupplier,
-                      CdcSidecarInstancesProvider sidecarInstancesProvider,
-                      SidecarCdcClient.ClientConfig clientConfig,
-                      SecretsProvider secretsProvider,
-                      ICdcStats cdcStats) throws IOException
-    {
-        this(
-        jobId,
-        partitionId,
-        cdcOptions,
-        clusterConfigProvider,
-        eventConsumer,
-        schemaSupplier,
-        tokenRangeSupplier,
-        clientConfig,
-        Sidecar.from(new 
SimpleSidecarInstancesProvider(sidecarInstancesProvider.instances().stream()
-                                                                               
 .map(i -> new SidecarInstanceImpl(i.hostname(), i.port()))
-                                                                               
 .collect(Collectors.toList())),
-                     clientConfig.toGenericSidecarConfig(), secretsProvider),
-        cdcStats
-        );
-    }
-
-    SidecarCdcBuilder(@NotNull String jobId,
-                      int partitionId,
-                      CdcOptions cdcOptions,
-                      ClusterConfigProvider clusterConfigProvider,
-                      EventConsumer eventConsumer,
-                      SchemaSupplier schemaSupplier,
-                      TokenRangeSupplier tokenRangeSupplier,
-                      SidecarCdcClient.ClientConfig clientConfig,
-                      SidecarClient sidecarClient,
+                      SidecarCdcClient sidecarCdcClient,
                       ICdcStats cdcStats)
     {
         super(jobId, partitionId, eventConsumer, schemaSupplier);
         this.clusterConfigProvider = clusterConfigProvider;
-        this.sidecarCdcClient = new SidecarCdcClient(clientConfig, 
sidecarClient, cdcStats);
+        this.sidecarCdcClient = sidecarCdcClient;
         withCdcOptions(cdcOptions);
         withTokenRangeSupplier(tokenRangeSupplier);
     }
@@ -108,14 +70,6 @@ public class SidecarCdcBuilder extends CdcBuilder
         return this;
     }
 
-    public SidecarCdcBuilder withSidecarClient(SidecarCdcClient.ClientConfig 
clientConfig,
-                                               SidecarClient sidecarClient,
-                                               ICdcStats cdcStats)
-    {
-        this.sidecarCdcClient = new SidecarCdcClient(clientConfig, 
sidecarClient, cdcStats);
-        return this;
-    }
-
     public SidecarCdcBuilder 
withReplicationFactorSupplier(ReplicationFactorSupplier 
replicationFactorSupplier)
     {
         this.replicationFactorSupplier = replicationFactorSupplier;
diff --git 
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java
 
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java
index 2dc98803..d8aa09ef 100644
--- 
a/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java
+++ 
b/cassandra-analytics-cdc-sidecar/src/main/java/org/apache/cassandra/cdc/sidecar/SidecarCdcClient.java
@@ -19,6 +19,7 @@
 
 package org.apache.cassandra.cdc.sidecar;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashMap;
@@ -32,8 +33,11 @@ import o.a.c.sidecar.client.shaded.common.utils.HttpRange;
 import org.apache.cassandra.cdc.api.CommitLog;
 import org.apache.cassandra.cdc.stats.ICdcStats;
 import org.apache.cassandra.clients.Sidecar;
+import org.apache.cassandra.secrets.SecretsProvider;
 import o.a.c.sidecar.client.shaded.client.SidecarClient;
 import o.a.c.sidecar.client.shaded.client.SidecarInstance;
+import o.a.c.sidecar.client.shaded.client.SidecarInstanceImpl;
+import o.a.c.sidecar.client.shaded.client.SimpleSidecarInstancesProvider;
 import o.a.c.sidecar.client.shaded.client.StreamBuffer;
 import org.apache.cassandra.spark.data.FileType;
 import org.apache.cassandra.spark.data.partitioner.CassandraInstance;
@@ -52,21 +56,52 @@ import static 
org.apache.cassandra.spark.utils.Properties.DEFAULT_MILLIS_TO_SLEE
 import static org.apache.cassandra.spark.utils.Properties.DEFAULT_SIDECAR_PORT;
 import static 
org.apache.cassandra.spark.utils.Properties.DEFAULT_TIMEOUT_SECONDS;
 
-public class SidecarCdcClient
+public class SidecarCdcClient implements AutoCloseable
 {
     final ClientConfig config;
     final SidecarClient sidecarClient;
     final ICdcStats stats;
 
-    public SidecarCdcClient(ClientConfig config,
-                            SidecarClient sidecarClient,
-                            ICdcStats stats)
+    public SidecarCdcClient(ClientConfig clientConfig,
+                            CdcSidecarInstancesProvider instancesProvider,
+                            SecretsProvider secretsProvider,
+                            ICdcStats cdcStats) throws IOException
+    {
+        this(clientConfig,
+             Sidecar.from(new 
SimpleSidecarInstancesProvider(instancesProvider.instances()
+                                                                              
.stream()
+                                                                              
.map(i -> new SidecarInstanceImpl(i.hostname(), i.port()))
+                                                                              
.collect(Collectors.toList())),
+                          clientConfig.toGenericSidecarConfig(),
+                          secretsProvider),
+             cdcStats);
+    }
+
+    private SidecarCdcClient(ClientConfig config,
+                             SidecarClient sidecarClient,
+                             ICdcStats stats)
     {
         this.config = config;
         this.sidecarClient = sidecarClient;
         this.stats = stats;
     }
 
+    /**
+     * Closes the underlying {@link SidecarClient} and releases associated 
resources (e.g. thread pools,
+     * HTTP connections).
+     *
+     * <p>{@code SidecarCdcClient} is intended to be used as a singleton whose 
lifecycle is managed by the
+     * enclosing component. Callers should not create per-request instances; 
instead, a single instance
+     * should be constructed at startup and closed during shutdown to avoid 
thread and resource leaks.
+     *
+     * @throws Exception if the underlying client throws while closing
+     */
+    @Override
+    public void close() throws Exception
+    {
+        sidecarClient.close();
+    }
+
     public CompletableFuture<List<CommitLog>> 
listCdcCommitLogSegments(CassandraInstance instance)
     {
         return sidecarClient.listCdcSegments(toSidecarInstance(instance))
diff --git 
a/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java
 
b/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java
index 3ad306d8..b8913492 100644
--- 
a/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java
+++ 
b/cassandra-analytics-cdc-sidecar/src/test/java/org/apache/cassandra/cdc/sidecar/SidecarCdcTest.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.cdc.sidecar;
 
 import org.junit.jupiter.api.Test;
 
-import o.a.c.sidecar.client.shaded.client.SidecarClient;
 import org.apache.cassandra.cdc.api.CdcOptions;
 import org.apache.cassandra.cdc.api.EventConsumer;
 import org.apache.cassandra.cdc.api.SchemaSupplier;
@@ -46,8 +45,7 @@ public class SidecarCdcTest
         EventConsumer eventConsumer = mock(EventConsumer.class);
         SchemaSupplier schemaSupplier = mock(SchemaSupplier.class);
         TokenRangeSupplier tokenRangeSupplier = mock(TokenRangeSupplier.class);
-        SidecarCdcClient.ClientConfig clientConfig = 
SidecarCdcClient.ClientConfig.create();
-        SidecarClient mockSidecarClient = mock(SidecarClient.class);
+        SidecarCdcClient mockSidecarCdcClient = mock(SidecarCdcClient.class);
         ICdcStats cdcStats = mock(ICdcStats.class);
 
         SidecarCdcBuilder builder = new SidecarCdcBuilder(
@@ -58,22 +56,13 @@ public class SidecarCdcTest
             eventConsumer,
             schemaSupplier,
             tokenRangeSupplier,
-            clientConfig,
-            mockSidecarClient,
+            mockSidecarCdcClient,
             cdcStats
         );
 
-        // Verify the builder is properly created and configured
         assertThat(builder).isNotNull();
         assertThat(builder).isInstanceOf(SidecarCdcBuilder.class);
-
-        // Verify the builder has the cluster config provider set
         
assertThat(builder.clusterConfigProvider).isEqualTo(clusterConfigProvider);
-
-        // Verify the builder has a sidecar CDC client configured
-        assertThat(builder.sidecarCdcClient).isNotNull();
-        
assertThat(builder.sidecarCdcClient.sidecarClient).isEqualTo(mockSidecarClient);
-        assertThat(builder.sidecarCdcClient.config).isEqualTo(clientConfig);
-        assertThat(builder.sidecarCdcClient.stats).isEqualTo(cdcStats);
+        assertThat(builder.sidecarCdcClient).isEqualTo(mockSidecarCdcClient);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to