This is an automated email from the ASF dual-hosted git repository. mcvsubbu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 31c55af [Part 3.1] Deepstore by-pass: Add a new best effort segment uploader with bounded upload time and d… (#5314) 31c55af is described below commit 31c55afdb6a40f98189308ce6292587ead9d0dec Author: Ting Chen <tingc...@uber.com> AuthorDate: Tue May 5 14:20:20 2020 -0700 [Part 3.1] Deepstore by-pass: Add a new best effort segment uploader with bounded upload time and d… (#5314) * Add a new best effort segment uploader with bounded upload time and default segment location when upload fails. * Update pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/BestEffortSegmentUploader.java Co-authored-by: Subbu Subramaniam <mcvsu...@users.noreply.github.com> * Update pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java Co-authored-by: Subbu Subramaniam <mcvsu...@users.noreply.github.com> * Update pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/BestEffortSegmentUploader.java Co-authored-by: Subbu Subramaniam <mcvsu...@users.noreply.github.com> * Revised based on comments. * Change comments. * Revised the splitcommiter. * Update pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java Co-authored-by: Subbu Subramaniam <mcvsu...@users.noreply.github.com> * Further revise. Co-authored-by: Subbu Subramaniam <mcvsu...@users.noreply.github.com> --- .../realtime/LLRealtimeSegmentDataManager.java | 2 +- .../manager/realtime/PinotFSSegmentUploader.java | 86 ++++++++++ .../manager/realtime/SegmentCommitterFactory.java | 13 +- .../data/manager/realtime/SegmentUploader.java | 3 +- .../realtime/Server2ControllerSegmentUploader.java | 3 +- .../manager/realtime/SplitSegmentCommitter.java | 20 +-- .../realtime/PinotFSSegmentUploaderTest.java | 187 +++++++++++++++++++++ .../Server2ControllerSegmentUploaderTest.java | 8 +- .../DefaultCommitterRealtimeIntegrationTest.java | 6 +- 9 files changed, 296 insertions(+), 32 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index dcad570..033a220 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -1231,7 +1231,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _consumeEndTime = now + minConsumeTimeMillis; } - _segmentCommitterFactory = new SegmentCommitterFactory(segmentLogger, _indexLoadingConfig, _protocolHandler); + _segmentCommitterFactory = new SegmentCommitterFactory(segmentLogger, _protocolHandler); segmentLogger .info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", _llcSegmentName, diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java new file mode 100644 index 0000000..5dae4e4 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.manager.realtime; + +import java.io.File; +import java.net.URI; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.StringUtil; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A segment uploader which does segment upload to a segment store (with store root dir configured as + * _segmentStoreUriStr) using PinotFS within a configurable timeout period. The final segment location would be in the + * URI _segmentStoreUriStr/_tableNameWithType/segmentName if successful. + */ +public class PinotFSSegmentUploader implements SegmentUploader { + private Logger LOGGER = LoggerFactory.getLogger(PinotFSSegmentUploader.class); + private String _segmentStoreUriStr; + private ExecutorService _executorService = Executors.newCachedThreadPool(); + private int _timeoutInMs; + + public PinotFSSegmentUploader(String segmentStoreDirUri, int timeoutMillis) { + _segmentStoreUriStr = segmentStoreDirUri; + _timeoutInMs = timeoutMillis; + } + + public URI uploadSegment(File segmentFile, LLCSegmentName segmentName) { + if (_segmentStoreUriStr == null || _segmentStoreUriStr.isEmpty()) { + return null; + } + Callable<URI> uploadTask = () -> { + URI destUri = new URI(StringUtil + .join(File.separator, _segmentStoreUriStr, segmentName.getTableName(), segmentName.getSegmentName())); + try { + PinotFS pinotFS = PinotFSFactory.create(new URI(_segmentStoreUriStr).getScheme()); + // Check and delete any existing segment file. + if (pinotFS.exists(destUri)) { + pinotFS.delete(destUri, true); + } + pinotFS.copyFromLocalFile(segmentFile, destUri); + return destUri; + } catch (Exception e) { + LOGGER.warn("Failed copy segment tar file {} to segment store {}: {}", segmentFile.getName(), destUri, e); + } + return null; + }; + Future<URI> future = _executorService.submit(uploadTask); + try { + URI segmentLocation = future.get(_timeoutInMs, TimeUnit.MILLISECONDS); + return segmentLocation; + } catch (InterruptedException e) { + LOGGER.info("Interrupted while waiting for segment upload of {} to {}.", segmentName, _segmentStoreUriStr); + Thread.currentThread().interrupt(); + } catch (Exception e) { + LOGGER + .warn("Failed to upload file {} of segment {} for table {} ", segmentFile.getAbsolutePath(), segmentName, e); + } + + return null; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java index 84e9c6e..df0f6b8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java @@ -19,7 +19,6 @@ package org.apache.pinot.core.data.manager.realtime; import org.apache.pinot.common.protocols.SegmentCompletionProtocol; -import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; import org.slf4j.Logger; @@ -29,18 +28,16 @@ import org.slf4j.Logger; */ public class SegmentCommitterFactory { private static Logger LOGGER; - private final IndexLoadingConfig _indexLoadingConfig; private final ServerSegmentCompletionProtocolHandler _protocolHandler; - public SegmentCommitterFactory(Logger segmentLogger, IndexLoadingConfig indexLoadingConfig, - ServerSegmentCompletionProtocolHandler protocolHandler) { + public SegmentCommitterFactory(Logger segmentLogger, ServerSegmentCompletionProtocolHandler protocolHandler) { LOGGER = segmentLogger; - _indexLoadingConfig = indexLoadingConfig; _protocolHandler = protocolHandler; } - - public SegmentCommitter createSplitSegmentCommitter(SegmentCompletionProtocol.Request.Params params, SegmentUploader segmentUploader) { - return new SplitSegmentCommitter(LOGGER, _protocolHandler, _indexLoadingConfig, params, segmentUploader); + + public SegmentCommitter createSplitSegmentCommitter(SegmentCompletionProtocol.Request.Params params, + SegmentUploader segmentUploader) { + return new SplitSegmentCommitter(LOGGER, _protocolHandler, params, segmentUploader); } public SegmentCommitter createDefaultSegmentCommitter(SegmentCompletionProtocol.Request.Params params) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentUploader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentUploader.java index 7cefdee..44d0a36 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentUploader.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentUploader.java @@ -20,9 +20,10 @@ package org.apache.pinot.core.data.manager.realtime; import java.io.File; import java.net.URI; +import org.apache.pinot.common.utils.LLCSegmentName; public interface SegmentUploader { // Returns the URI of the uploaded segment. null if the upload fails. - URI uploadSegment(File segmentFile); + URI uploadSegment(File segmentFile, LLCSegmentName segmentName); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java index 91b177d..35084aa 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java @@ -24,6 +24,7 @@ import java.net.URISyntaxException; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.protocols.SegmentCompletionProtocol; import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.core.util.SegmentCompletionProtocolUtils; import org.apache.pinot.server.realtime.ControllerLeaderLocator; import org.slf4j.Logger; @@ -52,7 +53,7 @@ public class Server2ControllerSegmentUploader implements SegmentUploader { } @Override - public URI uploadSegment(File segmentFile) { + public URI uploadSegment(File segmentFile, LLCSegmentName segmentName) { SegmentCompletionProtocol.Response response = uploadSegmentToController(segmentFile); if (response.getStatus() == SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS) { try { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java index 60938ef..53bc82d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java @@ -21,7 +21,7 @@ package org.apache.pinot.core.data.manager.realtime; import java.io.File; import java.net.URI; import org.apache.pinot.common.protocols.SegmentCompletionProtocol; -import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; import org.slf4j.Logger; @@ -33,16 +33,13 @@ import org.slf4j.Logger; public class SplitSegmentCommitter implements SegmentCommitter { private final SegmentCompletionProtocol.Request.Params _params; private final ServerSegmentCompletionProtocolHandler _protocolHandler; - private final IndexLoadingConfig _indexLoadingConfig; private final SegmentUploader _segmentUploader; - private final Logger _segmentLogger; public SplitSegmentCommitter(Logger segmentLogger, ServerSegmentCompletionProtocolHandler protocolHandler, - IndexLoadingConfig indexLoadingConfig, SegmentCompletionProtocol.Request.Params params, SegmentUploader segmentUploader) { + SegmentCompletionProtocol.Request.Params params, SegmentUploader segmentUploader) { _segmentLogger = segmentLogger; _protocolHandler = protocolHandler; - _indexLoadingConfig = indexLoadingConfig; _params = new SegmentCompletionProtocol.Request.Params(params); _segmentUploader = segmentUploader; } @@ -58,19 +55,14 @@ public class SplitSegmentCommitter implements SegmentCommitter { return SegmentCompletionProtocol.RESP_FAILED; } - URI segmentLocation = _segmentUploader.uploadSegment(segmentTarFile); + URI segmentLocation = _segmentUploader.uploadSegment(segmentTarFile, new LLCSegmentName(_params.getSegmentName())); if (segmentLocation == null) { - return SegmentCompletionProtocol.RESP_FAILED; + return SegmentCompletionProtocol.RESP_FAILED; } _params.withSegmentLocation(segmentLocation.toString()); - SegmentCompletionProtocol.Response commitEndResponse; - if (_indexLoadingConfig.isEnableSplitCommitEndWithMetadata()) { - commitEndResponse = - _protocolHandler.segmentCommitEndWithMetadata(_params, segmentBuildDescriptor.getMetadataFiles()); - } else { - commitEndResponse = _protocolHandler.segmentCommitEnd(_params); - } + SegmentCompletionProtocol.Response commitEndResponse = + _protocolHandler.segmentCommitEndWithMetadata(_params, segmentBuildDescriptor.getMetadataFiles()); if (!commitEndResponse.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS)) { _segmentLogger.warn("CommitEnd failed with response {}", commitEndResponse.toJsonString()); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java new file mode 100644 index 0000000..7d1cd2d --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.manager.realtime; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.UUID; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.exception.HttpErrorStatusException; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.StringUtil; +import org.apache.pinot.spi.filesystem.PinotFS; +import org.apache.pinot.spi.filesystem.PinotFSFactory; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class PinotFSSegmentUploaderTest { + private static final int TIMEOUT_IN_MS = 100; + private File _file; + private LLCSegmentName _llcSegmentName; + @BeforeClass + public void setUp() + throws URISyntaxException, IOException, HttpErrorStatusException { + Configuration fsConfig = new PropertiesConfiguration(); + fsConfig.setProperty("class.hdfs", "org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploaderTest$AlwaysSucceedPinotFS"); + fsConfig.setProperty("class.timeout", "org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploaderTest$AlwaysTimeoutPinotFS"); + fsConfig.setProperty("class.existing", "org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploaderTest$AlwaysExistPinotFS"); + PinotFSFactory.init(fsConfig); + _file = FileUtils.getFile(FileUtils.getTempDirectory(), UUID.randomUUID().toString()); + _file.deleteOnExit(); + _llcSegmentName = new LLCSegmentName("test_REALTIME", 1, 0, System.currentTimeMillis()); + } + + @Test + public void testSuccessfulUpload() { + SegmentUploader segmentUploader = new PinotFSSegmentUploader("hdfs://root", TIMEOUT_IN_MS); + URI segmentURI = segmentUploader.uploadSegment(_file, _llcSegmentName); + Assert.assertEquals(segmentURI.toString(), StringUtil.join(File.separator,"hdfs://root", _llcSegmentName.getTableName(), _llcSegmentName.getSegmentName())); + } + + @Test + public void testSegmentAlreadyExist() { + SegmentUploader segmentUploader = new PinotFSSegmentUploader("existing://root", TIMEOUT_IN_MS); + URI segmentURI = segmentUploader.uploadSegment(_file, _llcSegmentName); + Assert.assertEquals(segmentURI.toString(), StringUtil.join(File.separator,"existing://root", _llcSegmentName.getTableName(), _llcSegmentName.getSegmentName())); + } + + @Test + public void testUploadTimeOut() { + SegmentUploader segmentUploader = new PinotFSSegmentUploader("timeout://root", TIMEOUT_IN_MS); + URI segmentURI = segmentUploader.uploadSegment(_file, _llcSegmentName); + Assert.assertNull(segmentURI); + } + + @Test + public void testNoSegmentStoreConfigured() { + SegmentUploader segmentUploader = new PinotFSSegmentUploader("", TIMEOUT_IN_MS); + URI segmentURI = segmentUploader.uploadSegment(_file, _llcSegmentName); + Assert.assertNull(segmentURI); + } + + public static class AlwaysSucceedPinotFS extends PinotFS { + + @Override + public void init(Configuration config) { + + } + + @Override + public boolean mkdir(URI uri) + throws IOException { + return false; + } + + @Override + public boolean delete(URI segmentUri, boolean forceDelete) + throws IOException { + return false; + } + + @Override + public boolean doMove(URI srcUri, URI dstUri) + throws IOException { + return false; + } + + @Override + public boolean copy(URI srcUri, URI dstUri) + throws IOException { + return false; + } + + @Override + public boolean exists(URI fileUri) + throws IOException { + return false; + } + + @Override + public long length(URI fileUri) + throws IOException { + return 0; + } + + @Override + public String[] listFiles(URI fileUri, boolean recursive) + throws IOException { + return new String[0]; + } + + @Override + public void copyToLocalFile(URI srcUri, File dstFile) + throws Exception { + } + + @Override + public void copyFromLocalFile(File srcFile, URI dstUri) + throws Exception { + } + + @Override + public boolean isDirectory(URI uri) + throws IOException { + return false; + } + + @Override + public long lastModified(URI uri) + throws IOException { + return 0; + } + + @Override + public boolean touch(URI uri) + throws IOException { + return false; + } + + @Override + public InputStream open(URI uri) + throws IOException { + return null; + } + } + + public static class AlwaysTimeoutPinotFS extends AlwaysSucceedPinotFS { + @Override + public void copyFromLocalFile(File srcFile, URI dstUri) + throws Exception { + // Make sure the sleep time > the timeout threshold of uploader. + Thread.sleep(TIMEOUT_IN_MS * 1000); + } + } + + public static class AlwaysExistPinotFS extends AlwaysSucceedPinotFS { + @Override + public boolean exists(URI fileUri) + throws IOException { + return true; + } + } + +} + diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java index 4bad294..85f84f1 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java @@ -28,6 +28,7 @@ import org.apache.pinot.common.exception.HttpErrorStatusException; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.protocols.SegmentCompletionProtocol; import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.common.utils.SimpleHttpResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,6 +52,7 @@ public class Server2ControllerSegmentUploaderTest { private static Logger _logger = LoggerFactory.getLogger(Server2ControllerSegmentUploaderTest.class); private FileUploadDownloadClient _fileUploadDownloadClient; private File _file; + private LLCSegmentName _llcSegmentName; @BeforeClass public void setUp() @@ -75,6 +77,8 @@ public class Server2ControllerSegmentUploaderTest { _file = FileUtils.getFile(FileUtils.getTempDirectory(), UUID.randomUUID().toString()); _file.deleteOnExit(); + + _llcSegmentName = new LLCSegmentName("test_REALTIME", 1, 0, System.currentTimeMillis()); } @AfterClass @@ -88,7 +92,7 @@ public class Server2ControllerSegmentUploaderTest { Server2ControllerSegmentUploader uploader = new Server2ControllerSegmentUploader(_logger, _fileUploadDownloadClient, GOOD_CONTROLLER_VIP, "segmentName", 10000, mock(ServerMetrics.class)); - URI segmentURI = uploader.uploadSegment(_file); + URI segmentURI = uploader.uploadSegment(_file, _llcSegmentName); Assert.assertEquals(segmentURI.toString(), SEGMENT_LOCATION); } @@ -98,7 +102,7 @@ public class Server2ControllerSegmentUploaderTest { Server2ControllerSegmentUploader uploader = new Server2ControllerSegmentUploader(_logger, _fileUploadDownloadClient, BAD_CONTROLLER_VIP, "segmentName", 10000, mock(ServerMetrics.class)); - URI segmentURI = uploader.uploadSegment(_file); + URI segmentURI = uploader.uploadSegment(_file, _llcSegmentName); Assert.assertNull(segmentURI); } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DefaultCommitterRealtimeIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DefaultCommitterRealtimeIntegrationTest.java index 30c8d4b..adcb014 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DefaultCommitterRealtimeIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DefaultCommitterRealtimeIntegrationTest.java @@ -33,7 +33,6 @@ import org.apache.pinot.core.data.manager.realtime.SegmentCommitter; import org.apache.pinot.core.data.manager.realtime.SegmentCommitterFactory; import org.apache.pinot.core.data.readers.GenericRowRecordReader; import org.apache.pinot.core.data.readers.PinotSegmentUtil; -import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.server.realtime.ControllerLeaderLocator; import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; import org.apache.pinot.spi.config.table.TableConfig; @@ -96,12 +95,10 @@ public class DefaultCommitterRealtimeIntegrationTest extends RealtimeClusterInte @Test public void testDefaultCommitter() throws Exception { - IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(); ServerMetrics serverMetrics = new ServerMetrics(new MetricsRegistry()); ServerSegmentCompletionProtocolHandler protocolHandler = new ServerSegmentCompletionProtocolHandler(serverMetrics, getTableName()); - SegmentCompletionProtocol.Response prevResponse = new SegmentCompletionProtocol.Response(); LLRealtimeSegmentDataManager.SegmentBuildDescriptor segmentBuildDescriptor = mock(LLRealtimeSegmentDataManager.SegmentBuildDescriptor.class); @@ -134,8 +131,7 @@ public class DefaultCommitterRealtimeIntegrationTest extends RealtimeClusterInte sendGetRequest("http://localhost:" + DEFAULT_CONTROLLER_PORT + "/segmentConsumed?instance=" + instanceId + "&name=" + segmentName + "&offset=" + END_OFFSET); - SegmentCommitterFactory segmentCommitterFactory = - new SegmentCommitterFactory(LOGGER, indexLoadingConfig, protocolHandler); + SegmentCommitterFactory segmentCommitterFactory = new SegmentCommitterFactory(LOGGER, protocolHandler); SegmentCommitter segmentCommitter = segmentCommitterFactory.createDefaultSegmentCommitter(params); segmentCommitter.commit(END_OFFSET, 3, segmentBuildDescriptor); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org