This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c703695d2 Add close method to upsert interfaces (#9212)
5c703695d2 is described below

commit 5c703695d245278e4a47ab066c6d6477dfcf487c
Author: Kartik Khare <kharekar...@gmail.com>
AuthorDate: Wed Aug 17 15:43:38 2022 +0530

    Add close method to upsert interfaces (#9212)
    
    * Add close method to upsert interfaces
    
    * Extend with Closeable
    
    Co-authored-by: Kartik Khare <kharekartik@Kartiks-MacBook-Pro.local>
---
 .../core/data/manager/realtime/RealtimeTableDataManager.java      | 7 +++++++
 .../local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java | 6 ++++++
 .../local/upsert/ConcurrentMapTableUpsertMetadataManager.java     | 8 ++++++++
 .../segment/local/upsert/PartitionUpsertMetadataManager.java      | 3 ++-
 .../pinot/segment/local/upsert/TableUpsertMetadataManager.java    | 3 ++-
 5 files changed, 25 insertions(+), 2 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index af7f366ebd..f41f7edba5 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -199,6 +199,13 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   @Override
   protected void doShutdown() {
     _segmentAsyncExecutorService.shutdown();
+    if (_tableUpsertMetadataManager != null) {
+      try {
+        _tableUpsertMetadataManager.close();
+      } catch (IOException e) {
+        _logger.warn("Cannot close upsert metadata manager properly for table: 
{}", _tableNameWithType, e);
+      }
+    }
     for (SegmentDataManager segmentDataManager : 
_segmentDataManagerMap.values()) {
       segmentDataManager.destroy();
     }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index e6890c8a6c..1654eaca0d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -428,6 +428,12 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
implements PartitionUps
     }
   }
 
+  @Override
+  public void close() {
+    _logger.info("Closing metadata manager for table {} and partition {}, 
current primary key count: {}",
+        _tableNameWithType, _partitionId, 
_primaryKeyToRecordLocationMap.size());
+  }
+
   @VisibleForTesting
   static class RecordLocation {
     private final IndexSegment _segment;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index 67474e145d..5c5c357079 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -37,4 +37,12 @@ public class ConcurrentMapTableUpsertMetadataManager extends 
BaseTableUpsertMeta
         k -> new 
ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, 
_primaryKeyColumns,
             _comparisonColumn, _hashFunction, _partialUpsertHandler, 
_serverMetrics));
   }
+
+  @Override
+  public void close() {
+    for (ConcurrentMapPartitionUpsertMetadataManager 
partitionUpsertMetadataManager
+        : _partitionMetadataManagerMap.values()) {
+      partitionUpsertMetadataManager.close();
+    }
+  }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index 2c5f68df45..ef5ec7c414 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.upsert;
 
+import java.io.Closeable;
 import java.util.List;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.segment.spi.ImmutableSegment;
@@ -51,7 +52,7 @@ import org.apache.pinot.spi.data.readers.GenericRow;
  * </ul>
  */
 @ThreadSafe
-public interface PartitionUpsertMetadataManager {
+public interface PartitionUpsertMetadataManager extends Closeable {
 
   /**
    * Returns the primary key columns.
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
index ffafb999ee..5d0c450166 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.segment.local.upsert;
 
+import java.io.Closeable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
@@ -30,7 +31,7 @@ import org.apache.pinot.spi.data.Schema;
  * The manager of the upsert metadata of a table.
  */
 @ThreadSafe
-public interface TableUpsertMetadataManager {
+public interface TableUpsertMetadataManager extends Closeable {
 
   void init(TableConfig tableConfig, Schema schema, TableDataManager 
tableDataManager, ServerMetrics serverMetrics);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to