This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 31c55af  [Part 3.1] Deepstore by-pass: Add a new best effort segment 
uploader with bounded upload time and d… (#5314)
31c55af is described below

commit 31c55afdb6a40f98189308ce6292587ead9d0dec
Author: Ting Chen <tingc...@uber.com>
AuthorDate: Tue May 5 14:20:20 2020 -0700

    [Part 3.1] Deepstore by-pass: Add a new best effort segment uploader with 
bounded upload time and d… (#5314)
    
    * Add a new best effort segment uploader with bounded upload time and 
default segment location when upload fails.
    
    * Update 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/BestEffortSegmentUploader.java
    
    Co-authored-by: Subbu Subramaniam <mcvsu...@users.noreply.github.com>
    
    * Update 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
    
    Co-authored-by: Subbu Subramaniam <mcvsu...@users.noreply.github.com>
    
    * Update 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/BestEffortSegmentUploader.java
    
    Co-authored-by: Subbu Subramaniam <mcvsu...@users.noreply.github.com>
    
    * Revised based on comments.
    
    * Change comments.
    
    * Revised the splitcommiter.
    
    * Update 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
    
    Co-authored-by: Subbu Subramaniam <mcvsu...@users.noreply.github.com>
    
    * Further revise.
    
    Co-authored-by: Subbu Subramaniam <mcvsu...@users.noreply.github.com>
---
 .../realtime/LLRealtimeSegmentDataManager.java     |   2 +-
 .../manager/realtime/PinotFSSegmentUploader.java   |  86 ++++++++++
 .../manager/realtime/SegmentCommitterFactory.java  |  13 +-
 .../data/manager/realtime/SegmentUploader.java     |   3 +-
 .../realtime/Server2ControllerSegmentUploader.java |   3 +-
 .../manager/realtime/SplitSegmentCommitter.java    |  20 +--
 .../realtime/PinotFSSegmentUploaderTest.java       | 187 +++++++++++++++++++++
 .../Server2ControllerSegmentUploaderTest.java      |   8 +-
 .../DefaultCommitterRealtimeIntegrationTest.java   |   6 +-
 9 files changed, 296 insertions(+), 32 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index dcad570..033a220 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1231,7 +1231,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
       _consumeEndTime = now + minConsumeTimeMillis;
     }
 
-    _segmentCommitterFactory = new SegmentCommitterFactory(segmentLogger, 
_indexLoadingConfig, _protocolHandler);
+    _segmentCommitterFactory = new SegmentCommitterFactory(segmentLogger, 
_protocolHandler);
 
     segmentLogger
         .info("Starting consumption on realtime consuming segment {} 
maxRowCount {} maxEndTime {}", _llcSegmentName,
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
new file mode 100644
index 0000000..5dae4e4
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import java.io.File;
+import java.net.URI;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A segment uploader which does segment upload to a segment store (with store 
root dir configured as
+ * _segmentStoreUriStr) using PinotFS within a configurable timeout period. 
The final segment location would be in the
+ * URI _segmentStoreUriStr/_tableNameWithType/segmentName if successful.
+ */
+public class PinotFSSegmentUploader implements SegmentUploader {
+  private Logger LOGGER = 
LoggerFactory.getLogger(PinotFSSegmentUploader.class);
+  private String _segmentStoreUriStr;
+  private ExecutorService _executorService = Executors.newCachedThreadPool();
+  private int _timeoutInMs;
+
+  public PinotFSSegmentUploader(String segmentStoreDirUri, int timeoutMillis) {
+    _segmentStoreUriStr = segmentStoreDirUri;
+    _timeoutInMs = timeoutMillis;
+  }
+
+  public URI uploadSegment(File segmentFile, LLCSegmentName segmentName) {
+    if (_segmentStoreUriStr == null || _segmentStoreUriStr.isEmpty()) {
+      return null;
+    }
+    Callable<URI> uploadTask = () -> {
+      URI destUri = new URI(StringUtil
+          .join(File.separator, _segmentStoreUriStr, 
segmentName.getTableName(), segmentName.getSegmentName()));
+      try {
+        PinotFS pinotFS = PinotFSFactory.create(new 
URI(_segmentStoreUriStr).getScheme());
+        // Check and delete any existing segment file.
+        if (pinotFS.exists(destUri)) {
+          pinotFS.delete(destUri, true);
+        }
+        pinotFS.copyFromLocalFile(segmentFile, destUri);
+        return destUri;
+      } catch (Exception e) {
+        LOGGER.warn("Failed copy segment tar file {} to segment store {}: {}", 
segmentFile.getName(), destUri, e);
+      }
+      return null;
+    };
+    Future<URI> future = _executorService.submit(uploadTask);
+    try {
+      URI segmentLocation = future.get(_timeoutInMs, TimeUnit.MILLISECONDS);
+      return segmentLocation;
+    } catch (InterruptedException e) {
+      LOGGER.info("Interrupted while waiting for segment upload of {} to {}.", 
segmentName, _segmentStoreUriStr);
+      Thread.currentThread().interrupt();
+    } catch (Exception e) {
+      LOGGER
+          .warn("Failed to upload file {} of segment {} for table {} ", 
segmentFile.getAbsolutePath(), segmentName, e);
+    }
+
+    return null;
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
index 84e9c6e..df0f6b8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.core.data.manager.realtime;
 
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
-import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
 import org.slf4j.Logger;
 
@@ -29,18 +28,16 @@ import org.slf4j.Logger;
  */
 public class SegmentCommitterFactory {
   private static Logger LOGGER;
-  private final IndexLoadingConfig _indexLoadingConfig;
   private final ServerSegmentCompletionProtocolHandler _protocolHandler;
 
-  public SegmentCommitterFactory(Logger segmentLogger, IndexLoadingConfig 
indexLoadingConfig,
-      ServerSegmentCompletionProtocolHandler protocolHandler) {
+  public SegmentCommitterFactory(Logger segmentLogger, 
ServerSegmentCompletionProtocolHandler protocolHandler) {
     LOGGER = segmentLogger;
-    _indexLoadingConfig = indexLoadingConfig;
     _protocolHandler = protocolHandler;
   }
-  
-  public SegmentCommitter 
createSplitSegmentCommitter(SegmentCompletionProtocol.Request.Params params, 
SegmentUploader segmentUploader) {
-    return new SplitSegmentCommitter(LOGGER, _protocolHandler, 
_indexLoadingConfig, params, segmentUploader);
+
+  public SegmentCommitter 
createSplitSegmentCommitter(SegmentCompletionProtocol.Request.Params params,
+      SegmentUploader segmentUploader) {
+    return new SplitSegmentCommitter(LOGGER, _protocolHandler, params, 
segmentUploader);
   }
 
   public SegmentCommitter 
createDefaultSegmentCommitter(SegmentCompletionProtocol.Request.Params params) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentUploader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentUploader.java
index 7cefdee..44d0a36 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentUploader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentUploader.java
@@ -20,9 +20,10 @@ package org.apache.pinot.core.data.manager.realtime;
 
 import java.io.File;
 import java.net.URI;
+import org.apache.pinot.common.utils.LLCSegmentName;
 
 
 public interface SegmentUploader {
   // Returns the URI of the uploaded segment. null if the upload fails.
-  URI uploadSegment(File segmentFile);
+  URI uploadSegment(File segmentFile, LLCSegmentName segmentName);
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
index 91b177d..35084aa 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
@@ -24,6 +24,7 @@ import java.net.URISyntaxException;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.core.util.SegmentCompletionProtocolUtils;
 import org.apache.pinot.server.realtime.ControllerLeaderLocator;
 import org.slf4j.Logger;
@@ -52,7 +53,7 @@ public class Server2ControllerSegmentUploader implements 
SegmentUploader {
   }
 
   @Override
-  public URI uploadSegment(File segmentFile) {
+  public URI uploadSegment(File segmentFile,  LLCSegmentName segmentName) {
     SegmentCompletionProtocol.Response response = 
uploadSegmentToController(segmentFile);
     if (response.getStatus() == 
SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS) {
       try {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
index 60938ef..53bc82d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
@@ -21,7 +21,7 @@ package org.apache.pinot.core.data.manager.realtime;
 import java.io.File;
 import java.net.URI;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
-import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
 import org.slf4j.Logger;
 
@@ -33,16 +33,13 @@ import org.slf4j.Logger;
 public class SplitSegmentCommitter implements SegmentCommitter {
   private final SegmentCompletionProtocol.Request.Params _params;
   private final ServerSegmentCompletionProtocolHandler _protocolHandler;
-  private final IndexLoadingConfig _indexLoadingConfig;
   private final SegmentUploader _segmentUploader;
-
   private final Logger _segmentLogger;
 
   public SplitSegmentCommitter(Logger segmentLogger, 
ServerSegmentCompletionProtocolHandler protocolHandler,
-      IndexLoadingConfig indexLoadingConfig, 
SegmentCompletionProtocol.Request.Params params, SegmentUploader 
segmentUploader) {
+      SegmentCompletionProtocol.Request.Params params, SegmentUploader 
segmentUploader) {
     _segmentLogger = segmentLogger;
     _protocolHandler = protocolHandler;
-    _indexLoadingConfig = indexLoadingConfig;
     _params = new SegmentCompletionProtocol.Request.Params(params);
     _segmentUploader = segmentUploader;
   }
@@ -58,19 +55,14 @@ public class SplitSegmentCommitter implements 
SegmentCommitter {
       return SegmentCompletionProtocol.RESP_FAILED;
     }
 
-    URI segmentLocation = _segmentUploader.uploadSegment(segmentTarFile);
+    URI segmentLocation = _segmentUploader.uploadSegment(segmentTarFile, new 
LLCSegmentName(_params.getSegmentName()));
     if (segmentLocation == null) {
-        return SegmentCompletionProtocol.RESP_FAILED;
+      return SegmentCompletionProtocol.RESP_FAILED;
     }
     _params.withSegmentLocation(segmentLocation.toString());
 
-    SegmentCompletionProtocol.Response commitEndResponse;
-    if (_indexLoadingConfig.isEnableSplitCommitEndWithMetadata()) {
-      commitEndResponse =
-          _protocolHandler.segmentCommitEndWithMetadata(_params, 
segmentBuildDescriptor.getMetadataFiles());
-    } else {
-      commitEndResponse = _protocolHandler.segmentCommitEnd(_params);
-    }
+    SegmentCompletionProtocol.Response commitEndResponse =
+        _protocolHandler.segmentCommitEndWithMetadata(_params, 
segmentBuildDescriptor.getMetadataFiles());
 
     if 
(!commitEndResponse.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS))
 {
       _segmentLogger.warn("CommitEnd failed with response {}", 
commitEndResponse.toJsonString());
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java
new file mode 100644
index 0000000..7d1cd2d
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploaderTest.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.UUID;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class PinotFSSegmentUploaderTest {
+  private static final int TIMEOUT_IN_MS = 100;
+  private File _file;
+  private LLCSegmentName _llcSegmentName;
+  @BeforeClass
+  public void setUp()
+      throws URISyntaxException, IOException, HttpErrorStatusException {
+    Configuration fsConfig = new PropertiesConfiguration();
+    fsConfig.setProperty("class.hdfs", 
"org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploaderTest$AlwaysSucceedPinotFS");
+    fsConfig.setProperty("class.timeout", 
"org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploaderTest$AlwaysTimeoutPinotFS");
+    fsConfig.setProperty("class.existing", 
"org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploaderTest$AlwaysExistPinotFS");
+    PinotFSFactory.init(fsConfig);
+    _file = FileUtils.getFile(FileUtils.getTempDirectory(), 
UUID.randomUUID().toString());
+    _file.deleteOnExit();
+    _llcSegmentName = new LLCSegmentName("test_REALTIME", 1, 0, 
System.currentTimeMillis());
+  }
+
+  @Test
+  public void testSuccessfulUpload() {
+    SegmentUploader segmentUploader = new 
PinotFSSegmentUploader("hdfs://root", TIMEOUT_IN_MS);
+    URI segmentURI = segmentUploader.uploadSegment(_file, _llcSegmentName);
+    Assert.assertEquals(segmentURI.toString(), 
StringUtil.join(File.separator,"hdfs://root", _llcSegmentName.getTableName(), 
_llcSegmentName.getSegmentName()));
+  }
+
+  @Test
+  public void testSegmentAlreadyExist() {
+    SegmentUploader segmentUploader = new 
PinotFSSegmentUploader("existing://root", TIMEOUT_IN_MS);
+    URI segmentURI = segmentUploader.uploadSegment(_file, _llcSegmentName);
+    Assert.assertEquals(segmentURI.toString(), 
StringUtil.join(File.separator,"existing://root", 
_llcSegmentName.getTableName(), _llcSegmentName.getSegmentName()));
+  }
+
+  @Test
+  public void testUploadTimeOut() {
+    SegmentUploader segmentUploader = new 
PinotFSSegmentUploader("timeout://root", TIMEOUT_IN_MS);
+    URI segmentURI = segmentUploader.uploadSegment(_file, _llcSegmentName);
+    Assert.assertNull(segmentURI);
+  }
+
+  @Test
+  public void testNoSegmentStoreConfigured() {
+    SegmentUploader segmentUploader = new PinotFSSegmentUploader("", 
TIMEOUT_IN_MS);
+    URI segmentURI = segmentUploader.uploadSegment(_file, _llcSegmentName);
+    Assert.assertNull(segmentURI);
+  }
+
+  public static class AlwaysSucceedPinotFS extends PinotFS {
+
+    @Override
+    public void init(Configuration config) {
+
+    }
+
+    @Override
+    public boolean mkdir(URI uri)
+        throws IOException {
+      return false;
+    }
+
+    @Override
+    public boolean delete(URI segmentUri, boolean forceDelete)
+        throws IOException {
+      return false;
+    }
+
+    @Override
+    public boolean doMove(URI srcUri, URI dstUri)
+        throws IOException {
+      return false;
+    }
+
+    @Override
+    public boolean copy(URI srcUri, URI dstUri)
+        throws IOException {
+      return false;
+    }
+
+    @Override
+    public boolean exists(URI fileUri)
+        throws IOException {
+      return false;
+    }
+
+    @Override
+    public long length(URI fileUri)
+        throws IOException {
+      return 0;
+    }
+
+    @Override
+    public String[] listFiles(URI fileUri, boolean recursive)
+        throws IOException {
+      return new String[0];
+    }
+
+    @Override
+    public void copyToLocalFile(URI srcUri, File dstFile)
+        throws Exception {
+    }
+
+    @Override
+    public void copyFromLocalFile(File srcFile, URI dstUri)
+        throws Exception {
+    }
+
+    @Override
+    public boolean isDirectory(URI uri)
+        throws IOException {
+      return false;
+    }
+
+    @Override
+    public long lastModified(URI uri)
+        throws IOException {
+      return 0;
+    }
+
+    @Override
+    public boolean touch(URI uri)
+        throws IOException {
+      return false;
+    }
+
+    @Override
+    public InputStream open(URI uri)
+        throws IOException {
+      return null;
+    }
+  }
+
+  public static class AlwaysTimeoutPinotFS extends AlwaysSucceedPinotFS {
+    @Override
+    public void copyFromLocalFile(File srcFile, URI dstUri)
+        throws Exception {
+      // Make sure the sleep time > the timeout threshold of uploader.
+      Thread.sleep(TIMEOUT_IN_MS * 1000);
+    }
+  }
+
+  public static class AlwaysExistPinotFS extends AlwaysSucceedPinotFS {
+    @Override
+    public boolean exists(URI fileUri)
+        throws IOException {
+      return true;
+    }
+  }
+
+}
+
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java
index 4bad294..85f84f1 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploaderTest.java
@@ -28,6 +28,7 @@ import 
org.apache.pinot.common.exception.HttpErrorStatusException;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.SimpleHttpResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,6 +52,7 @@ public class Server2ControllerSegmentUploaderTest {
   private static Logger _logger = 
LoggerFactory.getLogger(Server2ControllerSegmentUploaderTest.class);
   private FileUploadDownloadClient _fileUploadDownloadClient;
   private File _file;
+  private LLCSegmentName _llcSegmentName;
 
   @BeforeClass
   public void setUp()
@@ -75,6 +77,8 @@ public class Server2ControllerSegmentUploaderTest {
 
     _file = FileUtils.getFile(FileUtils.getTempDirectory(), 
UUID.randomUUID().toString());
     _file.deleteOnExit();
+
+    _llcSegmentName = new LLCSegmentName("test_REALTIME", 1, 0, 
System.currentTimeMillis());
   }
 
   @AfterClass
@@ -88,7 +92,7 @@ public class Server2ControllerSegmentUploaderTest {
     Server2ControllerSegmentUploader uploader =
         new Server2ControllerSegmentUploader(_logger, 
_fileUploadDownloadClient, GOOD_CONTROLLER_VIP, "segmentName",
             10000, mock(ServerMetrics.class));
-    URI segmentURI = uploader.uploadSegment(_file);
+    URI segmentURI = uploader.uploadSegment(_file, _llcSegmentName);
     Assert.assertEquals(segmentURI.toString(), SEGMENT_LOCATION);
   }
 
@@ -98,7 +102,7 @@ public class Server2ControllerSegmentUploaderTest {
     Server2ControllerSegmentUploader uploader =
         new Server2ControllerSegmentUploader(_logger, 
_fileUploadDownloadClient, BAD_CONTROLLER_VIP, "segmentName",
             10000, mock(ServerMetrics.class));
-    URI segmentURI = uploader.uploadSegment(_file);
+    URI segmentURI = uploader.uploadSegment(_file, _llcSegmentName);
     Assert.assertNull(segmentURI);
   }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DefaultCommitterRealtimeIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DefaultCommitterRealtimeIntegrationTest.java
index 30c8d4b..adcb014 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DefaultCommitterRealtimeIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DefaultCommitterRealtimeIntegrationTest.java
@@ -33,7 +33,6 @@ import 
org.apache.pinot.core.data.manager.realtime.SegmentCommitter;
 import org.apache.pinot.core.data.manager.realtime.SegmentCommitterFactory;
 import org.apache.pinot.core.data.readers.GenericRowRecordReader;
 import org.apache.pinot.core.data.readers.PinotSegmentUtil;
-import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.server.realtime.ControllerLeaderLocator;
 import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -96,12 +95,10 @@ public class DefaultCommitterRealtimeIntegrationTest 
extends RealtimeClusterInte
   @Test
   public void testDefaultCommitter()
       throws Exception {
-    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
     ServerMetrics serverMetrics = new ServerMetrics(new MetricsRegistry());
     ServerSegmentCompletionProtocolHandler protocolHandler =
         new ServerSegmentCompletionProtocolHandler(serverMetrics, 
getTableName());
 
-    SegmentCompletionProtocol.Response prevResponse = new 
SegmentCompletionProtocol.Response();
     LLRealtimeSegmentDataManager.SegmentBuildDescriptor segmentBuildDescriptor 
=
         mock(LLRealtimeSegmentDataManager.SegmentBuildDescriptor.class);
 
@@ -134,8 +131,7 @@ public class DefaultCommitterRealtimeIntegrationTest 
extends RealtimeClusterInte
     sendGetRequest("http://localhost:"; + DEFAULT_CONTROLLER_PORT + 
"/segmentConsumed?instance=" + instanceId + "&name="
         + segmentName + "&offset=" + END_OFFSET);
 
-    SegmentCommitterFactory segmentCommitterFactory =
-        new SegmentCommitterFactory(LOGGER, indexLoadingConfig, 
protocolHandler);
+    SegmentCommitterFactory segmentCommitterFactory = new 
SegmentCommitterFactory(LOGGER, protocolHandler);
     SegmentCommitter segmentCommitter = 
segmentCommitterFactory.createDefaultSegmentCommitter(params);
     segmentCommitter.commit(END_OFFSET, 3, segmentBuildDescriptor);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to