This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 4ec38f7 Fix bug #6671: RealtimeTableDataManager shuts down SegmentBuildTimeLeaseExtender for all tables in the host (#6682) 4ec38f7 is described below commit 4ec38f79315d4017e5e2ac45e8989fa7fc4584fa Author: Jiapeng Tao <jia...@linkedin.com> AuthorDate: Tue Mar 16 00:38:58 2021 -0700 Fix bug #6671: RealtimeTableDataManager shuts down SegmentBuildTimeLeaseExtender for all tables in the host (#6682) Co-authored-by: Jiapeng Tao <jia...@jiatao-mn1.linkedin.biz> --- .../realtime/LLRealtimeSegmentDataManager.java | 2 +- .../manager/realtime/RealtimeTableDataManager.java | 2 +- .../realtime/SegmentBuildTimeLeaseExtender.java | 59 +++++++++++++++------- .../realtime/LLRealtimeSegmentDataManagerTest.java | 37 ++++++++++++-- .../starter/helix/HelixInstanceDataManager.java | 8 ++- 5 files changed, 83 insertions(+), 25 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 82a86d2..89a7210 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -1115,7 +1115,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _serverMetrics = serverMetrics; _segmentVersion = indexLoadingConfig.getSegmentVersion(); _instanceId = _realtimeTableDataManager.getServerInstance(); - _leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(_instanceId); + _leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(_tableNameWithType); _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics, _tableNameWithType); String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 73d30e1..7417c3a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -122,7 +122,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager { @Override protected void doInit() { - _leaseExtender = SegmentBuildTimeLeaseExtender.create(_instanceId, _serverMetrics, _tableNameWithType); + _leaseExtender = SegmentBuildTimeLeaseExtender.getOrCreate(_instanceId, _serverMetrics, _tableNameWithType); File statsFile = new File(_tableDataDir, STATS_FILE_NAME); try { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java index 69d7e80..725dc95 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java @@ -18,8 +18,8 @@ */ package org.apache.pinot.core.data.manager.realtime; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.Uninterruptibles; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; @@ -45,42 +45,63 @@ public class SegmentBuildTimeLeaseExtender { private static final int EXTRA_TIME_SECONDS = 120; // Retransmit lease request 10% before lease expires. private static final int REPEAT_REQUEST_PERIOD_SEC = (EXTRA_TIME_SECONDS * 9 / 10); - private static Logger LOGGER = LoggerFactory.getLogger(SegmentBuildTimeLeaseExtender.class); - private static final Map<String, SegmentBuildTimeLeaseExtender> INSTANCE_TO_LEASE_EXTENDER = new HashMap<>(1); + private static final Logger LOGGER = LoggerFactory.getLogger(SegmentBuildTimeLeaseExtender.class); + private static final Map<String, SegmentBuildTimeLeaseExtender> TABLE_TO_LEASE_EXTENDER = new ConcurrentHashMap<>(); + private static ScheduledExecutorService _executor; - private ScheduledExecutorService _executor; private final Map<String, Future> _segmentToFutureMap = new ConcurrentHashMap<>(); private final String _instanceId; + private final String _tableNameWithType; private final ServerSegmentCompletionProtocolHandler _protocolHandler; - public static SegmentBuildTimeLeaseExtender getLeaseExtender(final String instanceId) { - return INSTANCE_TO_LEASE_EXTENDER.get(instanceId); + public static void initExecutor() { + _executor = new ScheduledThreadPoolExecutor(1); } - public static synchronized SegmentBuildTimeLeaseExtender create(final String instanceId, - ServerMetrics serverMetrics, String tableNameWithType) { - SegmentBuildTimeLeaseExtender leaseExtender = INSTANCE_TO_LEASE_EXTENDER.get(instanceId); - if (leaseExtender != null) { - LOGGER.warn("Instance already exists"); - } else { - leaseExtender = new SegmentBuildTimeLeaseExtender(instanceId, serverMetrics, tableNameWithType); - INSTANCE_TO_LEASE_EXTENDER.put(instanceId, leaseExtender); + public static void shutdownExecutor() { + if (_executor != null) { + _executor.shutdownNow(); + _executor = null; } - return leaseExtender; + } + + @VisibleForTesting + public static boolean isExecutorShutdown() { + return _executor == null; + } + + public static SegmentBuildTimeLeaseExtender getLeaseExtender(final String tableNameWithType) { + return TABLE_TO_LEASE_EXTENDER.get(tableNameWithType); + } + + public static SegmentBuildTimeLeaseExtender getOrCreate(final String instanceId, + ServerMetrics serverMetrics, String tableNameWithType) { + return TABLE_TO_LEASE_EXTENDER.compute(tableNameWithType, (k, v) -> { + if (v == null) { + return new SegmentBuildTimeLeaseExtender(instanceId, serverMetrics, tableNameWithType); + } else { + LOGGER.warn("Lease extender for Table: {} already exists", tableNameWithType); + return v; + } + }); } private SegmentBuildTimeLeaseExtender(String instanceId, ServerMetrics serverMetrics, String tableNameWithType) { _instanceId = instanceId; + _tableNameWithType = tableNameWithType; _protocolHandler = new ServerSegmentCompletionProtocolHandler(serverMetrics, tableNameWithType); - _executor = new ScheduledThreadPoolExecutor(1); } public void shutDown() { - if (_executor != null) { - _executor.shutdownNow(); - _executor = null; + for (Map.Entry<String, Future> entry : _segmentToFutureMap.entrySet()) { + Future future = entry.getValue(); + boolean cancelled = future.cancel(true); + if (!cancelled) { + LOGGER.warn("Task could not be cancelled for {}" + entry.getKey()); + } } _segmentToFutureMap.clear(); + TABLE_TO_LEASE_EXTENDER.remove(_tableNameWithType); } /** diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java index 6da00df..39cf466 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java @@ -29,6 +29,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.io.FileUtils; +import org.apache.helix.HelixManager; +import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.common.metadata.instance.InstanceZKMetadata; import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata; import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata; @@ -36,7 +38,11 @@ import org.apache.pinot.common.metrics.PinotMetricUtils; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.protocols.SegmentCompletionProtocol; import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.config.TableConfigUtils; +import org.apache.pinot.core.data.manager.TableDataManager; import org.apache.pinot.core.data.manager.config.InstanceDataManagerConfig; +import org.apache.pinot.core.data.manager.config.TableDataManagerConfig; +import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider; import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl; import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory; import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory; @@ -58,6 +64,8 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -128,9 +136,9 @@ public class LLRealtimeSegmentDataManagerTest { return JsonUtils.stringToObject(_tableConfigJson, TableConfig.class); } - private RealtimeTableDataManager createTableDataManager() { + private RealtimeTableDataManager createTableDataManager(TableConfig tableConfig) { final String instanceId = "server-1"; - SegmentBuildTimeLeaseExtender.create(instanceId, new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), _tableName); + SegmentBuildTimeLeaseExtender.getOrCreate(instanceId, new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), tableConfig.getTableName()); RealtimeTableDataManager tableDataManager = mock(RealtimeTableDataManager.class); when(tableDataManager.getServerInstance()).thenReturn(instanceId); RealtimeSegmentStatsHistory statsHistory = mock(RealtimeSegmentStatsHistory.class); @@ -154,7 +162,7 @@ public class LLRealtimeSegmentDataManagerTest { LLCRealtimeSegmentZKMetadata segmentZKMetadata = createZkMetadata(); TableConfig tableConfig = createTableConfig(); InstanceZKMetadata instanceZKMetadata = new InstanceZKMetadata(); - RealtimeTableDataManager tableDataManager = createTableDataManager(); + RealtimeTableDataManager tableDataManager = createTableDataManager(tableConfig); String resourceDir = _segmentDir; LLCSegmentName llcSegmentName = new LLCSegmentName(_segmentNameStr); _partitionGroupIdToSemaphoreMap.putIfAbsent(_partitionGroupId, new Semaphore(1)); @@ -169,11 +177,13 @@ public class LLRealtimeSegmentDataManagerTest { @BeforeClass public void setUp() { _segmentDirFile.deleteOnExit(); + SegmentBuildTimeLeaseExtender.initExecutor(); } @AfterClass public void tearDown() { FileUtils.deleteQuietly(_segmentDirFile); + SegmentBuildTimeLeaseExtender.shutdownExecutor(); } @Test @@ -765,6 +775,27 @@ public class LLRealtimeSegmentDataManagerTest { Assert.assertEquals(secondSegmentDataManager.get().getPartitionGroupConsumerSemaphore().availablePermits(), 1); } + @Test + public void testShutdownTableDataManagerWillNotShutdownLeaseExtenderExecutor() + throws Exception { + TableConfig tableConfig = createTableConfig(); + tableConfig.setUpsertConfig(null); + ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class); + when(propertyStore.get(anyString(), any(), anyInt())).thenReturn(TableConfigUtils.toZNRecord(tableConfig)); + + TableDataManagerConfig tableDataManagerConfig = mock(TableDataManagerConfig.class); + when(tableDataManagerConfig.getTableDataManagerType()).thenReturn("REALTIME"); + when(tableDataManagerConfig.getTableName()).thenReturn(tableConfig.getTableName()); + when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath()); + + TableDataManager tableDataManager = TableDataManagerProvider + .getTableDataManager(tableDataManagerConfig, "testInstance", propertyStore, mock(ServerMetrics.class), + mock(HelixManager.class)); + tableDataManager.start(); + tableDataManager.shutDown(); + Assert.assertFalse(SegmentBuildTimeLeaseExtender.isExecutorShutdown()); + } + public static class FakeLLRealtimeSegmentDataManager extends LLRealtimeSegmentDataManager { public Field _state; diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index deb3a6a..30ba125 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -41,6 +41,7 @@ import org.apache.pinot.core.data.manager.SegmentDataManager; import org.apache.pinot.core.data.manager.TableDataManager; import org.apache.pinot.core.data.manager.config.TableDataManagerConfig; import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider; +import org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender; import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment; import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader; import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl; @@ -91,9 +92,12 @@ public class HelixInstanceDataManager implements InstanceDataManager { Preconditions.checkState(instanceSegmentTarDir.mkdirs()); } + // Initialize segment build time lease extender executor + SegmentBuildTimeLeaseExtender.initExecutor(); + LOGGER.info("Initialized segment build time lease extender executor"); + // Initialize the table data manager provider TableDataManagerProvider.init(_instanceDataManagerConfig); - LOGGER.info("Initialized Helix instance data manager"); } @@ -108,6 +112,8 @@ public class HelixInstanceDataManager implements InstanceDataManager { for (TableDataManager tableDataManager : _tableDataManagerMap.values()) { tableDataManager.shutDown(); } + SegmentBuildTimeLeaseExtender.shutdownExecutor(); + LOGGER.info("Segment build time lease extender executor shut down"); LOGGER.info("Helix instance data manager shut down"); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org