ankitsultana commented on code in PR #14686: URL: https://github.com/apache/pinot/pull/14686#discussion_r1933245880
########## pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadScheduler.java: ########## @@ -0,0 +1,423 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.predownload; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.configuration2.PropertiesConfiguration; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory; +import org.apache.pinot.server.conf.ServerConf; +import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig; +import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; +import org.apache.pinot.spi.crypt.PinotCrypterFactory; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.retry.AttemptsExceededException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class PredownloadScheduler { + + private static final Logger LOGGER = LoggerFactory.getLogger(PredownloadScheduler.class); + private static final String TMP_DIR_NAME = "tmp"; + // Segment download dir in format of "tmp-" + segmentName + "-" + UUID.randomUUID() + private static final String TMP_DIR_FORMAT = "tmp-%s-%s"; + private static final long DOWNLOAD_SEGMENTS_TIMEOUT_MIN = 60; Review Comment: This timeout should be configurable. Let's leave a todo? ########## pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadScheduler.java: ########## @@ -0,0 +1,423 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.predownload; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.configuration2.PropertiesConfiguration; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory; +import org.apache.pinot.server.conf.ServerConf; +import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig; +import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; +import org.apache.pinot.spi.crypt.PinotCrypterFactory; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.retry.AttemptsExceededException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class PredownloadScheduler { + + private static final Logger LOGGER = LoggerFactory.getLogger(PredownloadScheduler.class); + private static final String TMP_DIR_NAME = "tmp"; + // Segment download dir in format of "tmp-" + segmentName + "-" + UUID.randomUUID() + private static final String TMP_DIR_FORMAT = "tmp-%s-%s"; + private static final long DOWNLOAD_SEGMENTS_TIMEOUT_MIN = 60; + private static final long LOAD_SEGMENTS_TIMEOUT_MIN = 5; + private final PropertiesConfiguration _properties; + private final PinotConfiguration _pinotConfig; + private final InstanceDataManagerConfig _instanceDataManagerConfig; + private final String _clusterName; + private final String _instanceId; + private final String _zkAddress; + @VisibleForTesting + Executor _executor; + @VisibleForTesting + Set<String> _failedSegments; + @SuppressWarnings("NullAway.Init") + private PredownloadMetrics _predownloadMetrics; + private int _numOfSkippedSegments; + private int _numOfUnableToDownloadSegments; + private int _numOfDownloadSegments; + private long _totalDownloadedSizeBytes; + @SuppressWarnings("NullAway.Init") + private ZKClient _zkClient; + @SuppressWarnings("NullAway.Init") + private List<SegmentInfo> _segmentInfoList; + @SuppressWarnings("NullAway.Init") + private Map<String, TableInfo> _tableInfoMap; + + public PredownloadScheduler(PropertiesConfiguration properties) + throws Exception { + _properties = properties; + _clusterName = properties.getString(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME); + _zkAddress = properties.getString(CommonConstants.Helix.CONFIG_OF_ZOOKEEPR_SERVER); + _instanceId = properties.getString(CommonConstants.Server.CONFIG_OF_INSTANCE_ID); + _pinotConfig = new PinotConfiguration(properties); + _instanceDataManagerConfig = + new HelixInstanceDataManagerConfig(new ServerConf(_pinotConfig).getInstanceDataManagerConfig()); + // Get the number of available processors (vCPUs) + int numProcessors = Runtime.getRuntime().availableProcessors(); + _failedSegments = ConcurrentHashMap.newKeySet(); + // TODO: tune the value + _executor = Executors.newFixedThreadPool(numProcessors * 3); + LOGGER.info("Created thread pool with num of threads: {}", numProcessors * 3); + _numOfSkippedSegments = 0; + _numOfDownloadSegments = 0; + } + + public void start() { + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + LOGGER.info("Trying to stop predownload process!"); + stop(); + } catch (Exception e) { + e.printStackTrace(); + LOGGER.error("error shutting down predownload process : ", e); + } + } + }); + + long startTime = System.currentTimeMillis(); + initializeZK(); + initializeMetricsReporter(); + initializeSegmentFetcher(); + getSegmentsInfo(); + loadSegmentsFromLocal(); + PredownloadCompleteReason reason = downloadSegments(); + long timeTaken = System.currentTimeMillis() - startTime; + LOGGER.info( + "Predownload process took {} sec, tried to download {} segments, skipped {} segments " + + "and unable to download {} segments. Download size: {} MB. Download speed: {} MB/s", + timeTaken / 1000, _numOfDownloadSegments, _numOfSkippedSegments, _numOfUnableToDownloadSegments, + _totalDownloadedSizeBytes / (1024 * 1024), + (_totalDownloadedSizeBytes / (1024 * 1024)) / (timeTaken / 1000 + 1)); + if (reason.isSucceed()) { + _predownloadMetrics.preDownloadSucceed(_totalDownloadedSizeBytes, timeTaken); + } + StatusRecorder.predownloadComplete(reason, _clusterName, _instanceId, String.join(",", _failedSegments)); + } + + public void stop() { + if (_zkClient != null) { + _zkClient.close(); + } + if (_executor != null) { + ((ThreadPoolExecutor) _executor).shutdownNow(); + } + } + + void initializeZK() { + LOGGER.info("Initializing ZK client with address: {} and instanceId: {}", _zkAddress, _instanceId); + _zkClient = new ZKClient(_zkAddress, _clusterName, _instanceId); + _zkClient.start(); + } + + void initializeMetricsReporter() { + LOGGER.info("Initializing metrics reporter"); + + _predownloadMetrics = new PredownloadMetrics(); + StatusRecorder.registerMetrics(_predownloadMetrics); + } + + @VisibleForTesting + void getSegmentsInfo() { + LOGGER.info("Getting segments info from ZK"); + _segmentInfoList = _zkClient.getSegmentsOfInstance(_zkClient.getDataAccessor()); + if (_segmentInfoList.isEmpty()) { + PredownloadCompleteReason reason = PredownloadCompleteReason.NO_SEGMENT_TO_PREDOWNLOAD; Review Comment: nit: go for "PredownloadCompletionReason", since we use similar naming elsewhere (as in segment completion protocol). ########## pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadScheduler.java: ########## @@ -0,0 +1,423 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.predownload; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.configuration2.PropertiesConfiguration; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory; +import org.apache.pinot.server.conf.ServerConf; +import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig; +import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; +import org.apache.pinot.spi.crypt.PinotCrypterFactory; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.retry.AttemptsExceededException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class PredownloadScheduler { + + private static final Logger LOGGER = LoggerFactory.getLogger(PredownloadScheduler.class); + private static final String TMP_DIR_NAME = "tmp"; + // Segment download dir in format of "tmp-" + segmentName + "-" + UUID.randomUUID() + private static final String TMP_DIR_FORMAT = "tmp-%s-%s"; + private static final long DOWNLOAD_SEGMENTS_TIMEOUT_MIN = 60; + private static final long LOAD_SEGMENTS_TIMEOUT_MIN = 5; + private final PropertiesConfiguration _properties; + private final PinotConfiguration _pinotConfig; + private final InstanceDataManagerConfig _instanceDataManagerConfig; + private final String _clusterName; + private final String _instanceId; + private final String _zkAddress; + @VisibleForTesting + Executor _executor; + @VisibleForTesting + Set<String> _failedSegments; + @SuppressWarnings("NullAway.Init") + private PredownloadMetrics _predownloadMetrics; + private int _numOfSkippedSegments; + private int _numOfUnableToDownloadSegments; + private int _numOfDownloadSegments; + private long _totalDownloadedSizeBytes; + @SuppressWarnings("NullAway.Init") + private ZKClient _zkClient; + @SuppressWarnings("NullAway.Init") + private List<SegmentInfo> _segmentInfoList; + @SuppressWarnings("NullAway.Init") + private Map<String, TableInfo> _tableInfoMap; + + public PredownloadScheduler(PropertiesConfiguration properties) + throws Exception { + _properties = properties; + _clusterName = properties.getString(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME); + _zkAddress = properties.getString(CommonConstants.Helix.CONFIG_OF_ZOOKEEPR_SERVER); + _instanceId = properties.getString(CommonConstants.Server.CONFIG_OF_INSTANCE_ID); + _pinotConfig = new PinotConfiguration(properties); + _instanceDataManagerConfig = + new HelixInstanceDataManagerConfig(new ServerConf(_pinotConfig).getInstanceDataManagerConfig()); + // Get the number of available processors (vCPUs) + int numProcessors = Runtime.getRuntime().availableProcessors(); + _failedSegments = ConcurrentHashMap.newKeySet(); + // TODO: tune the value + _executor = Executors.newFixedThreadPool(numProcessors * 3); + LOGGER.info("Created thread pool with num of threads: {}", numProcessors * 3); + _numOfSkippedSegments = 0; + _numOfDownloadSegments = 0; + } + + public void start() { + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + LOGGER.info("Trying to stop predownload process!"); + stop(); + } catch (Exception e) { + e.printStackTrace(); + LOGGER.error("error shutting down predownload process : ", e); + } + } + }); + + long startTime = System.currentTimeMillis(); + initializeZK(); + initializeMetricsReporter(); + initializeSegmentFetcher(); + getSegmentsInfo(); + loadSegmentsFromLocal(); + PredownloadCompleteReason reason = downloadSegments(); + long timeTaken = System.currentTimeMillis() - startTime; + LOGGER.info( + "Predownload process took {} sec, tried to download {} segments, skipped {} segments " + + "and unable to download {} segments. Download size: {} MB. Download speed: {} MB/s", + timeTaken / 1000, _numOfDownloadSegments, _numOfSkippedSegments, _numOfUnableToDownloadSegments, + _totalDownloadedSizeBytes / (1024 * 1024), + (_totalDownloadedSizeBytes / (1024 * 1024)) / (timeTaken / 1000 + 1)); + if (reason.isSucceed()) { + _predownloadMetrics.preDownloadSucceed(_totalDownloadedSizeBytes, timeTaken); + } + StatusRecorder.predownloadComplete(reason, _clusterName, _instanceId, String.join(",", _failedSegments)); + } + + public void stop() { + if (_zkClient != null) { + _zkClient.close(); + } + if (_executor != null) { + ((ThreadPoolExecutor) _executor).shutdownNow(); + } + } + + void initializeZK() { + LOGGER.info("Initializing ZK client with address: {} and instanceId: {}", _zkAddress, _instanceId); + _zkClient = new ZKClient(_zkAddress, _clusterName, _instanceId); + _zkClient.start(); + } + + void initializeMetricsReporter() { + LOGGER.info("Initializing metrics reporter"); + + _predownloadMetrics = new PredownloadMetrics(); + StatusRecorder.registerMetrics(_predownloadMetrics); + } + + @VisibleForTesting + void getSegmentsInfo() { + LOGGER.info("Getting segments info from ZK"); + _segmentInfoList = _zkClient.getSegmentsOfInstance(_zkClient.getDataAccessor()); + if (_segmentInfoList.isEmpty()) { + PredownloadCompleteReason reason = PredownloadCompleteReason.NO_SEGMENT_TO_PREDOWNLOAD; + StatusRecorder.predownloadComplete(reason, _clusterName, _instanceId, ""); + } + _tableInfoMap = new HashMap<>(); + _zkClient.updateSegmentMetadata(_segmentInfoList, _tableInfoMap, _instanceDataManagerConfig); + } + + @VisibleForTesting + void loadSegmentsFromLocal() { + LOGGER.info("Loading segments from local to reduce number of segments to download"); + long startTime = System.currentTimeMillis(); + List<CompletableFuture<Void>> futures = new ArrayList<>(); + + // Submit tasks to the executor + for (SegmentInfo segmentInfo : _segmentInfoList) { + CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { + boolean loadSegmentSuccess = false; + try { + TableInfo tableInfo = _tableInfoMap.get(segmentInfo.getTableNameWithType()); + if (tableInfo != null) { + loadSegmentSuccess = tableInfo.loadSegmentFromLocal(segmentInfo, _instanceDataManagerConfig); + } + } catch (Exception e) { + LOGGER.error("Failed to load from local for segment: {} of table: {} with issue ", + segmentInfo.getSegmentName(), segmentInfo.getTableNameWithType(), e); + } + if (!loadSegmentSuccess && segmentInfo.canBeDownloaded()) { + _failedSegments.add(segmentInfo.getSegmentName()); + } + }, _executor); + + futures.add(future); + } + + // Wait for all CompletableFuture tasks to complete or timeout + CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + + try { + // Wait indefinitely for all tasks to complete + allOf.get(LOAD_SEGMENTS_TIMEOUT_MIN, TimeUnit.MINUTES); Review Comment: what's the behavior when there are ongoing downloads and this exits? is there a possibility to run into an inconsistent/partial state on disk? (same question for download segments step) ########## pinot-server/src/main/java/org/apache/pinot/server/predownload/ZKClient.java: ########## @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.predownload; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixException; +import org.apache.helix.PropertyPathBuilder; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.helix.model.CurrentState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.store.zk.AutoFallbackPropertyStore; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.helix.zookeeper.api.client.HelixZkClient; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; +import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory; +import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.CommonConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This class is to manage the ZK connection and operations. It will be used to fetch the segment + * metadata from ZK and prepare for the downloading. + */ +public class ZKClient { + + private static final Logger LOGGER = LoggerFactory.getLogger(ZKClient.class); + private static final long ZK_CONNECTION_TIMEOUT_MS = 30000L; + private final String _clusterName; + private final String _instanceName; + private final String _zkAddress; + + @SuppressWarnings("NullAway.Init") + private RealmAwareZkClient _zkClient; + + private boolean _started; + + public ZKClient(String zkAddress, String clusterName, String instanceName) { + _clusterName = clusterName; + _instanceName = instanceName; + _zkAddress = zkAddress; + _started = false; + } + + public void start() { + RealmAwareZkClient.RealmAwareZkClientConfig config = new RealmAwareZkClient.RealmAwareZkClientConfig(); + config.setConnectInitTimeout(ZK_CONNECTION_TIMEOUT_MS); + config.setZkSerializer(new ZNRecordSerializer()); + _zkClient = SharedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddress), config.createHelixZkClientConfig()); + _zkClient.waitUntilConnected(ZK_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS); + _started = true; + } + + public void close() { + if (_zkClient != null) { + _zkClient.close(); + _started = false; Review Comment: semantically this is incorrect, since the client was started before. Perhaps rename it to `_running`? ########## pinot-server/src/main/java/org/apache/pinot/server/predownload/TableInfo.java: ########## @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.predownload; + +import java.io.File; +import javax.annotation.Nullable; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.spi.store.SegmentDirectory; +import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class TableInfo { + private static final Logger LOGGER = LoggerFactory.getLogger(TableInfo.class); + private final String _tableNameWithType; + private final InstanceDataManagerConfig _instanceDataManagerConfig; + private final TableConfig _tableConfig; + @Nullable + private final Schema _schema; + + public TableInfo(String tableNameWithType, TableConfig tableConfig, @Nullable Schema schema, + InstanceDataManagerConfig instanceDataManagerConfig) { + _tableNameWithType = tableNameWithType; + _tableConfig = tableConfig; + _schema = schema; + _instanceDataManagerConfig = instanceDataManagerConfig; + } + + private static void closeSegmentDirectoryQuietly(@Nullable SegmentDirectory segmentDirectory) { + if (segmentDirectory != null) { + try { + segmentDirectory.close(); + } catch (Exception e) { + LOGGER.warn("Failed to close SegmentDirectory due to error: {}", e.getMessage()); + } + } + } + + public TableConfig getTableConfig() { + return _tableConfig; + } + + public InstanceDataManagerConfig getInstanceDataManagerConfig() { + return _instanceDataManagerConfig; + } + + /** + * After loading segment metadata from ZK, try to load from local and check if we are able to skip + * the downloading + * + * @param segmentInfo SegmentInfo of segment to be loaded + * @param instanceDataManagerConfig InstanceDataManagerConfig loaded from scheduler + * @return true if already presents, false if needs to be downloaded + */ + public boolean loadSegmentFromLocal(SegmentInfo segmentInfo, InstanceDataManagerConfig instanceDataManagerConfig) { + SegmentDirectory segmentDirectory = getSegmentDirectory(segmentInfo, instanceDataManagerConfig); Review Comment: `SegmentDirectory` is a closeable. Do we handle it somewhere? ########## pinot-server/src/main/java/org/apache/pinot/server/predownload/ZKClient.java: ########## @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.predownload; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixException; +import org.apache.helix.PropertyPathBuilder; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.helix.model.CurrentState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.store.zk.AutoFallbackPropertyStore; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.helix.zookeeper.api.client.HelixZkClient; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; +import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory; +import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.CommonConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This class is to manage the ZK connection and operations. It will be used to fetch the segment + * metadata from ZK and prepare for the downloading. + */ +public class ZKClient { + + private static final Logger LOGGER = LoggerFactory.getLogger(ZKClient.class); + private static final long ZK_CONNECTION_TIMEOUT_MS = 30000L; + private final String _clusterName; + private final String _instanceName; + private final String _zkAddress; + + @SuppressWarnings("NullAway.Init") + private RealmAwareZkClient _zkClient; + + private boolean _started; + + public ZKClient(String zkAddress, String clusterName, String instanceName) { Review Comment: Let's prefix all relevant classes with `Predownload` so as to not pollute namespace. (other classes `SegmentInfo`, `TableInfo`, `StatusRecorder`, `TestUtil`) ########## pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadScheduler.java: ########## @@ -0,0 +1,423 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.predownload; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.configuration2.PropertiesConfiguration; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory; +import org.apache.pinot.server.conf.ServerConf; +import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig; +import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; +import org.apache.pinot.spi.crypt.PinotCrypterFactory; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.retry.AttemptsExceededException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class PredownloadScheduler { + + private static final Logger LOGGER = LoggerFactory.getLogger(PredownloadScheduler.class); + private static final String TMP_DIR_NAME = "tmp"; + // Segment download dir in format of "tmp-" + segmentName + "-" + UUID.randomUUID() + private static final String TMP_DIR_FORMAT = "tmp-%s-%s"; + private static final long DOWNLOAD_SEGMENTS_TIMEOUT_MIN = 60; + private static final long LOAD_SEGMENTS_TIMEOUT_MIN = 5; + private final PropertiesConfiguration _properties; + private final PinotConfiguration _pinotConfig; + private final InstanceDataManagerConfig _instanceDataManagerConfig; + private final String _clusterName; + private final String _instanceId; + private final String _zkAddress; + @VisibleForTesting + Executor _executor; + @VisibleForTesting + Set<String> _failedSegments; + @SuppressWarnings("NullAway.Init") + private PredownloadMetrics _predownloadMetrics; + private int _numOfSkippedSegments; + private int _numOfUnableToDownloadSegments; + private int _numOfDownloadSegments; + private long _totalDownloadedSizeBytes; + @SuppressWarnings("NullAway.Init") + private ZKClient _zkClient; + @SuppressWarnings("NullAway.Init") + private List<SegmentInfo> _segmentInfoList; + @SuppressWarnings("NullAway.Init") + private Map<String, TableInfo> _tableInfoMap; + + public PredownloadScheduler(PropertiesConfiguration properties) + throws Exception { + _properties = properties; + _clusterName = properties.getString(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME); + _zkAddress = properties.getString(CommonConstants.Helix.CONFIG_OF_ZOOKEEPR_SERVER); + _instanceId = properties.getString(CommonConstants.Server.CONFIG_OF_INSTANCE_ID); + _pinotConfig = new PinotConfiguration(properties); + _instanceDataManagerConfig = + new HelixInstanceDataManagerConfig(new ServerConf(_pinotConfig).getInstanceDataManagerConfig()); + // Get the number of available processors (vCPUs) + int numProcessors = Runtime.getRuntime().availableProcessors(); + _failedSegments = ConcurrentHashMap.newKeySet(); + // TODO: tune the value + _executor = Executors.newFixedThreadPool(numProcessors * 3); Review Comment: for discussion: the downloads should largely be IO bound, so this could be much higher? ########## pinot-server/src/main/java/org/apache/pinot/server/predownload/SegmentInfo.java: ########## @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.predownload; + +import io.netty.util.internal.StringUtil; +import java.io.File; +import javax.annotation.Nullable; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.utils.config.TierConfigUtils; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader; +import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext; +import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry; +import org.apache.pinot.segment.spi.store.SegmentDirectory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class SegmentInfo { + private static final Logger LOGGER = LoggerFactory.getLogger(SegmentInfo.class); + private final String _segmentName; + private final String _tableNameWithType; + + @SuppressWarnings("NullAway.Init") Review Comment: these should not be required here? seems like an internal repo thing ########## pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java: ########## @@ -169,7 +169,13 @@ public enum ServerMeter implements AbstractMetrics.Meter { * That means that if a stage has 10 workers and all of them reach the limit, this will be increased by 1. * But if a single query has 2 different window operators and each one reaches the limit, this will be increased by 2. */ - WINDOW_TIMES_MAX_ROWS_REACHED("times", true); + WINDOW_TIMES_MAX_ROWS_REACHED("times", true), + + // predownload metrics + SEGMENT_DOWNLOAD_COUNT("predownloadSegmentCount", true), Review Comment: let's prefix the enum with `predownload` too? Since it only tracks predownload releated metrics. ########## pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadScheduler.java: ########## @@ -0,0 +1,423 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.predownload; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.configuration2.PropertiesConfiguration; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory; +import org.apache.pinot.server.conf.ServerConf; +import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig; +import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; +import org.apache.pinot.spi.crypt.PinotCrypterFactory; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.retry.AttemptsExceededException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class PredownloadScheduler { + + private static final Logger LOGGER = LoggerFactory.getLogger(PredownloadScheduler.class); + private static final String TMP_DIR_NAME = "tmp"; + // Segment download dir in format of "tmp-" + segmentName + "-" + UUID.randomUUID() + private static final String TMP_DIR_FORMAT = "tmp-%s-%s"; + private static final long DOWNLOAD_SEGMENTS_TIMEOUT_MIN = 60; + private static final long LOAD_SEGMENTS_TIMEOUT_MIN = 5; + private final PropertiesConfiguration _properties; + private final PinotConfiguration _pinotConfig; + private final InstanceDataManagerConfig _instanceDataManagerConfig; + private final String _clusterName; + private final String _instanceId; + private final String _zkAddress; + @VisibleForTesting + Executor _executor; + @VisibleForTesting + Set<String> _failedSegments; + @SuppressWarnings("NullAway.Init") + private PredownloadMetrics _predownloadMetrics; + private int _numOfSkippedSegments; + private int _numOfUnableToDownloadSegments; + private int _numOfDownloadSegments; + private long _totalDownloadedSizeBytes; + @SuppressWarnings("NullAway.Init") + private ZKClient _zkClient; + @SuppressWarnings("NullAway.Init") + private List<SegmentInfo> _segmentInfoList; + @SuppressWarnings("NullAway.Init") + private Map<String, TableInfo> _tableInfoMap; + + public PredownloadScheduler(PropertiesConfiguration properties) + throws Exception { + _properties = properties; + _clusterName = properties.getString(CommonConstants.Helix.CONFIG_OF_CLUSTER_NAME); + _zkAddress = properties.getString(CommonConstants.Helix.CONFIG_OF_ZOOKEEPR_SERVER); + _instanceId = properties.getString(CommonConstants.Server.CONFIG_OF_INSTANCE_ID); + _pinotConfig = new PinotConfiguration(properties); + _instanceDataManagerConfig = + new HelixInstanceDataManagerConfig(new ServerConf(_pinotConfig).getInstanceDataManagerConfig()); + // Get the number of available processors (vCPUs) + int numProcessors = Runtime.getRuntime().availableProcessors(); + _failedSegments = ConcurrentHashMap.newKeySet(); + // TODO: tune the value + _executor = Executors.newFixedThreadPool(numProcessors * 3); + LOGGER.info("Created thread pool with num of threads: {}", numProcessors * 3); + _numOfSkippedSegments = 0; + _numOfDownloadSegments = 0; + } + + public void start() { + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + LOGGER.info("Trying to stop predownload process!"); + stop(); + } catch (Exception e) { + e.printStackTrace(); Review Comment: remove `e.printStackTrace()`? ########## pinot-server/src/main/java/org/apache/pinot/server/predownload/ZKClient.java: ########## @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.predownload; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixException; +import org.apache.helix.PropertyPathBuilder; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.helix.model.CurrentState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.store.zk.AutoFallbackPropertyStore; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.helix.zookeeper.api.client.HelixZkClient; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; +import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory; +import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.CommonConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This class is to manage the ZK connection and operations. It will be used to fetch the segment + * metadata from ZK and prepare for the downloading. + */ +public class ZKClient { Review Comment: This needs to be an AutoCloseable ########## pinot-server/src/main/java/org/apache/pinot/server/predownload/StatusRecorder.java: ########## @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.predownload; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import javax.annotation.Nullable; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.filefilter.FileFilterUtils; +import org.apache.commons.io.filefilter.IOFileFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +enum PredownloadCompleteReason { Review Comment: Separate files for each definition in this? ########## pinot-server/src/main/java/org/apache/pinot/server/predownload/PredownloadScheduler.java: ########## @@ -0,0 +1,423 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.predownload; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.configuration2.PropertiesConfiguration; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.utils.TarCompressionUtils; +import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory; +import org.apache.pinot.server.conf.ServerConf; +import org.apache.pinot.server.starter.helix.HelixInstanceDataManagerConfig; +import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; +import org.apache.pinot.spi.crypt.PinotCrypterFactory; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.retry.AttemptsExceededException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class PredownloadScheduler { Review Comment: We haven't really hooked this into server starter in this PR? You are planning to do a follow-up? ########## pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java: ########## @@ -79,7 +79,9 @@ public enum ServerGauge implements AbstractMetrics.Gauge { REALTIME_INGESTION_OFFSET_LAG("offsetLag", false), REALTIME_INGESTION_UPSTREAM_OFFSET("upstreamOffset", false), REALTIME_INGESTION_CONSUMING_OFFSET("consumingOffset", false), - REALTIME_CONSUMER_DIR_USAGE("bytes", true); + REALTIME_CONSUMER_DIR_USAGE("bytes", true), + SEGMENT_DOWNLOAD_SPEED("bytes", true), Review Comment: the speed gauge will get overwritten several times leading to inaccurate obs. better approach would be to just to track the amount of data predownloaded, and infer the rate later -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org