This is an automated email from the ASF dual-hosted git repository. apucher pushed a commit to branch thirdeye-temp in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 387f0e0641a17d24ed07170192e7fc31feb70494 Author: Ting Chen <tingc...@uber.com> AuthorDate: Thu Jul 23 17:13:34 2020 -0700 [Deepstore by-passing]Introduce a subclasses SplitSegmentCommitter which will proceeds to commit even if the segment upload fails. (#5700) * First commit to enable by passing segment store in LLC. * Fix a compilation issue. * Fix an unit test. * Refactor the segment committer factory api. * Introduction a new constant for peer download scheme. * Fix a typo. * Add a TODO on how to control split commit behavior and refactor the uploadSegment method. * Remove unused vars. --- .../apache/pinot/common/utils/CommonConstants.java | 1 + .../realtime/PinotLLCRealtimeSegmentManager.java | 8 ++-- .../PinotLLCRealtimeSegmentManagerTest.java | 3 +- .../helix/core/realtime/SegmentCompletionTest.java | 3 +- .../realtime/LLRealtimeSegmentDataManager.java | 24 +++-------- .../realtime/PeerSchemeSplitSegmentCommitter.java | 48 ++++++++++++++++++++++ .../manager/realtime/PinotFSSegmentUploader.java | 2 + .../manager/realtime/SegmentCommitterFactory.java | 37 +++++++++++++---- .../manager/realtime/SplitSegmentCommitter.java | 14 ++++++- .../segment/index/loader/IndexLoadingConfig.java | 7 ++++ .../realtime/LLRealtimeSegmentDataManagerTest.java | 2 + 11 files changed, 117 insertions(+), 32 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java index 7c4a7b2..3e25512 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java @@ -343,6 +343,7 @@ public class CommonConstants { public static final String FLUSH_THRESHOLD_SIZE = "segment.flush.threshold.size"; public static final String FLUSH_THRESHOLD_TIME = "segment.flush.threshold.time"; public static final String PARTITION_METADATA = "segment.partition.metadata"; + public static final String PEER_SEGMENT_DOWNLOAD_SCHEME = "peer://"; /** * This field is used for parallel push protection to lock the segment globally. * We put the segment upload start timestamp so that if the previous push failed without unlock the segment, the diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 1691553..06cc58d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -398,8 +398,9 @@ public class PinotLLCRealtimeSegmentManager { } private boolean isPeerSegmentDownloadScheme(CommittingSegmentDescriptor committingSegmentDescriptor) { - return !(committingSegmentDescriptor == null) && !(committingSegmentDescriptor.getSegmentLocation() == null) && - committingSegmentDescriptor.getSegmentLocation().toLowerCase().startsWith("peer://"); + return !(committingSegmentDescriptor == null) && !(committingSegmentDescriptor.getSegmentLocation() == null) + && committingSegmentDescriptor.getSegmentLocation().toLowerCase() + .startsWith(CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME); } /** @@ -514,7 +515,8 @@ public class PinotLLCRealtimeSegmentManager { } private boolean isPeerURL(String segmentLocation) { - return segmentLocation != null && segmentLocation.toLowerCase().startsWith("peer://"); + return segmentLocation != null && segmentLocation.toLowerCase() + .startsWith(CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME); } /** diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index acd4664..e15d28c 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -45,6 +45,7 @@ import org.apache.helix.model.IdealState; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata; import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.common.utils.CommonConstants.Helix; import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status; @@ -801,7 +802,7 @@ public class PinotLLCRealtimeSegmentManagerTest { // Test case 2: segment location with peer format: peer://segment1, verify that an empty string is stored in zk. committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1, CURRENT_TIME_MS).getSegmentName(); - String peerSegmentLocation = "peer:///segment1"; + String peerSegmentLocation = CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME + "/segment1"; committingSegmentDescriptor = new CommittingSegmentDescriptor(committingSegment, new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(), 0L, peerSegmentLocation); committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata()); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java index 2723063..0166f9f 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java @@ -320,7 +320,8 @@ public class SegmentCompletionTest { @Test public void testHappyPathSplitCommitWithPeerDownloadScheme() throws Exception { - testHappyPathSplitCommit(5L, "peer:///segment1", "peer:///segment1"); + testHappyPathSplitCommit(5L, CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME + "/segment1", + CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME + "/segment1"); } @Test diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 0a8b27d..8944e15 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -846,24 +846,12 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { } SegmentCommitter segmentCommitter; - - if (isSplitCommit) { - // TODO: make segment uploader used in the segment committer configurable. - SegmentUploader segmentUploader; - try { - segmentUploader = - new Server2ControllerSegmentUploader(segmentLogger, _protocolHandler.getFileUploadDownloadClient(), - _protocolHandler.getSegmentCommitUploadURL(params, controllerVipUrl), _segmentNameStr, - ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(), _serverMetrics); - } catch (URISyntaxException e) { - segmentLogger.error("Segment commit upload url error: ", e); - return SegmentCompletionProtocol.RESP_NOT_SENT; - } - segmentCommitter = _segmentCommitterFactory.createSplitSegmentCommitter(params, segmentUploader); - } else { - segmentCommitter = _segmentCommitterFactory.createDefaultSegmentCommitter(params); + try { + segmentCommitter = _segmentCommitterFactory.createSegmentCommitter(isSplitCommit, params, controllerVipUrl); + } catch (URISyntaxException e) { + segmentLogger.error("Failed to create a segment committer: ", e); + return SegmentCompletionProtocol.RESP_NOT_SENT; } - return segmentCommitter.commit(_segmentBuildDescriptor); } @@ -1268,7 +1256,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _consumeEndTime = now + minConsumeTimeMillis; } - _segmentCommitterFactory = new SegmentCommitterFactory(segmentLogger, _protocolHandler); + _segmentCommitterFactory = new SegmentCommitterFactory(segmentLogger, _protocolHandler, tableConfig, indexLoadingConfig, serverMetrics); segmentLogger .info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", _llcSegmentName, diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PeerSchemeSplitSegmentCommitter.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PeerSchemeSplitSegmentCommitter.java new file mode 100644 index 0000000..118f0ff --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PeerSchemeSplitSegmentCommitter.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.manager.realtime; + +import java.io.File; +import java.net.URI; +import org.apache.pinot.common.protocols.SegmentCompletionProtocol; +import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.StringUtil; +import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; +import org.slf4j.Logger; + + +public class PeerSchemeSplitSegmentCommitter extends SplitSegmentCommitter { + public PeerSchemeSplitSegmentCommitter(Logger segmentLogger, ServerSegmentCompletionProtocolHandler protocolHandler, + SegmentCompletionProtocol.Request.Params params, SegmentUploader segmentUploader) { + super(segmentLogger, protocolHandler, params, segmentUploader); + } + + // Always return a uri string even if the segment upload fails and returns a null uri. + // If the segment upload fails, put peer:///segment_name in the segment location to notify the controller it is a + // peer download scheme. + protected String uploadSegment(File segmentTarFile, SegmentUploader segmentUploader, + SegmentCompletionProtocol.Request.Params params) { + URI segmentLocation = segmentUploader.uploadSegment(segmentTarFile, new LLCSegmentName(params.getSegmentName())); + if (segmentLocation == null) { + return StringUtil.join("/", CommonConstants.Segment.PEER_SEGMENT_DOWNLOAD_SCHEME, params.getSegmentName()); + } + return segmentLocation.toString(); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java index 5dae4e4..75a7fff 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java @@ -40,6 +40,8 @@ import org.slf4j.LoggerFactory; */ public class PinotFSSegmentUploader implements SegmentUploader { private Logger LOGGER = LoggerFactory.getLogger(PinotFSSegmentUploader.class); + public static final int DEFAULT_SEGMENT_UPLOAD_TIMEOUT_MILLIS = 10 * 1000; + private String _segmentStoreUriStr; private ExecutorService _executorService = Executors.newCachedThreadPool(); private int _timeoutInMs; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java index df0f6b8..2d8154e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java @@ -18,8 +18,12 @@ */ package org.apache.pinot.core.data.manager.realtime; +import java.net.URISyntaxException; +import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.protocols.SegmentCompletionProtocol; +import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; +import org.apache.pinot.spi.config.table.TableConfig; import org.slf4j.Logger; @@ -29,18 +33,37 @@ import org.slf4j.Logger; public class SegmentCommitterFactory { private static Logger LOGGER; private final ServerSegmentCompletionProtocolHandler _protocolHandler; + private final TableConfig _tableConfig; + private final ServerMetrics _serverMetrics; + private final IndexLoadingConfig _indexLoadingConfig; - public SegmentCommitterFactory(Logger segmentLogger, ServerSegmentCompletionProtocolHandler protocolHandler) { + public SegmentCommitterFactory(Logger segmentLogger, ServerSegmentCompletionProtocolHandler protocolHandler, + TableConfig tableConfig, IndexLoadingConfig indexLoadingConfig, ServerMetrics serverMetrics) { LOGGER = segmentLogger; _protocolHandler = protocolHandler; + _tableConfig = tableConfig; + _indexLoadingConfig = indexLoadingConfig; + _serverMetrics = serverMetrics; } - public SegmentCommitter createSplitSegmentCommitter(SegmentCompletionProtocol.Request.Params params, - SegmentUploader segmentUploader) { - return new SplitSegmentCommitter(LOGGER, _protocolHandler, params, segmentUploader); - } + public SegmentCommitter createSegmentCommitter(boolean isSplitCommit, SegmentCompletionProtocol.Request.Params params, + String controllerVipUrl) + throws URISyntaxException { + if (!isSplitCommit) { + return new DefaultSegmentCommitter(LOGGER, _protocolHandler, params); + } + SegmentUploader segmentUploader; + // TODO Instead of using a peer segment download scheme to control how the servers do split commit, we should use + // other configs such as server or controller configs or controller responses to the servers. + if (_tableConfig.getValidationConfig().getPeerSegmentDownloadScheme() != null) { + segmentUploader = new PinotFSSegmentUploader(_indexLoadingConfig.getSegmentStoreURI(), + PinotFSSegmentUploader.DEFAULT_SEGMENT_UPLOAD_TIMEOUT_MILLIS); + return new PeerSchemeSplitSegmentCommitter(LOGGER, _protocolHandler, params, segmentUploader); + } - public SegmentCommitter createDefaultSegmentCommitter(SegmentCompletionProtocol.Request.Params params) { - return new DefaultSegmentCommitter(LOGGER, _protocolHandler, params); + segmentUploader = new Server2ControllerSegmentUploader(LOGGER, _protocolHandler.getFileUploadDownloadClient(), + _protocolHandler.getSegmentCommitUploadURL(params, controllerVipUrl), params.getSegmentName(), + ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(), _serverMetrics); + return new SplitSegmentCommitter(LOGGER, _protocolHandler, params, segmentUploader); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java index 8d73498..33b2ac0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java @@ -56,11 +56,11 @@ public class SplitSegmentCommitter implements SegmentCommitter { return SegmentCompletionProtocol.RESP_FAILED; } - URI segmentLocation = _segmentUploader.uploadSegment(segmentTarFile, new LLCSegmentName(_params.getSegmentName())); + String segmentLocation = uploadSegment(segmentTarFile, _segmentUploader, _params); if (segmentLocation == null) { return SegmentCompletionProtocol.RESP_FAILED; } - _params.withSegmentLocation(segmentLocation.toString()); + _params.withSegmentLocation(segmentLocation); SegmentCompletionProtocol.Response commitEndResponse = _protocolHandler.segmentCommitEndWithMetadata(_params, segmentBuildDescriptor.getMetadataFiles()); @@ -71,4 +71,14 @@ public class SplitSegmentCommitter implements SegmentCommitter { } return commitEndResponse; } + + // Return null iff the segment upload fails. + protected String uploadSegment(File segmentTarFile, SegmentUploader segmentUploader, + SegmentCompletionProtocol.Request.Params params) { + URI segmentLocation = segmentUploader.uploadSegment(segmentTarFile, new LLCSegmentName(params.getSegmentName())); + if (segmentLocation != null) { + return segmentLocation.toString(); + } + return null; + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java index 6ad0f3f..3c3fb7e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/loader/IndexLoadingConfig.java @@ -41,6 +41,7 @@ import org.apache.pinot.spi.config.table.TableConfig; */ public class IndexLoadingConfig { private static final int DEFAULT_REALTIME_AVG_MULTI_VALUE_COUNT = 2; + private static final String SEGMENT_STORE_URI = "segment.store.uri"; private ReadMode _readMode = ReadMode.DEFAULT_MODE; private List<String> _sortedColumns = Collections.emptyList(); @@ -62,6 +63,7 @@ public class IndexLoadingConfig { private boolean _isRealtimeOffheapAllocation; private boolean _isDirectRealtimeOffheapAllocation; private boolean _enableSplitCommitEndWithMetadata; + private String _segmentStoreURI; // constructed from FieldConfig private Map<String, Map<String, String>> _columnProperties = new HashMap<>(); @@ -185,6 +187,7 @@ public class IndexLoadingConfig { _realtimeAvgMultiValueCount = Integer.valueOf(avgMultiValueCount); } _enableSplitCommitEndWithMetadata = instanceDataManagerConfig.isEnableSplitCommitEndWithMetadata(); + _segmentStoreURI = instanceDataManagerConfig.getConfig().getProperty(SEGMENT_STORE_URI); } /** @@ -332,6 +335,10 @@ public class IndexLoadingConfig { return _columnMinMaxValueGeneratorMode; } + + public String getSegmentStoreURI() { return _segmentStoreURI; } + + /** * For tests only. */ diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java index 646ef0a..44af74f 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java @@ -44,6 +44,7 @@ import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamMessageDecoder; import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.LongMsgOffsetFactory; import org.apache.pinot.spi.stream.PermanentConsumerException; @@ -782,6 +783,7 @@ public class LLRealtimeSegmentDataManagerTest { when(dataManagerConfig.getSegmentFormatVersion()).thenReturn(null); when(dataManagerConfig.isEnableSplitCommit()).thenReturn(false); when(dataManagerConfig.isRealtimeOffHeapAllocation()).thenReturn(false); + when(dataManagerConfig.getConfig()).thenReturn(new PinotConfiguration()); return dataManagerConfig; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org