mcvsubbu commented on a change in pull request #5314:
URL: https://github.com/apache/incubator-pinot/pull/5314#discussion_r418733908



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
##########
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import java.io.File;
+import java.net.URI;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+// A segment uploader which does segment upload to a segment store (with store 
root dir configured as

Review comment:
       Can you put this in javadoc format?

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
##########
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import java.io.File;
+import java.net.URI;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+// A segment uploader which does segment upload to a segment store (with store 
root dir configured as
+// _segmentStoreUriStr) using PinotFS within a configurable timeout period. 
The final segment location would be in the
+// URI _segmentStoreUriStr/_tableNameWithType/segmentName if successful.
+public class PinotFSSegmentUploader implements SegmentUploader {
+  private Logger LOGGER = 
LoggerFactory.getLogger(PinotFSSegmentUploader.class);
+  private String _segmentStoreUriStr;
+  private ExecutorService _executorService = Executors.newCachedThreadPool();
+  private int _timeoutInMs;
+
+  public PinotFSSegmentUploader(String segmentStoreDirUri, int timeoutMillis) {
+    _segmentStoreUriStr = segmentStoreDirUri;
+    _timeoutInMs = timeoutMillis;
+  }
+
+  public URI uploadSegment(File segmentFile, LLCSegmentName segmentName) {
+    if (_segmentStoreUriStr == null || _segmentStoreUriStr.isEmpty()) {
+      return null;
+    }
+    Callable<URI> uploadTask = () -> {
+      try {
+        PinotFS pinotFS = PinotFSFactory.create(new 
URI(_segmentStoreUriStr).getScheme());
+        URI destUri = new URI(StringUtil
+            .join(File.separator, _segmentStoreUriStr, 
segmentName.getTableName(), segmentName.getSegmentName()));
+        // Check and delete any existing segment file.
+        if (pinotFS.exists(destUri)) {
+          pinotFS.delete(destUri, true);
+        }
+        pinotFS.copyFromLocalFile(segmentFile, destUri);
+        return destUri;
+      } catch (Exception e) {
+        LOGGER.warn("Failed copy segment tar file to segment store {}: {}", 
segmentFile.getName(), e);

Review comment:
       include segment destUri in the log

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SplitSegmentCommitter.java
##########
@@ -35,16 +38,34 @@
   private final ServerSegmentCompletionProtocolHandler _protocolHandler;
   private final IndexLoadingConfig _indexLoadingConfig;
   private final SegmentUploader _segmentUploader;
+  // The default segment location uri str, could be null.
+  private final String _defaultSegmentLocation;
 
   private final Logger _segmentLogger;
 
   public SplitSegmentCommitter(Logger segmentLogger, 
ServerSegmentCompletionProtocolHandler protocolHandler,
       IndexLoadingConfig indexLoadingConfig, 
SegmentCompletionProtocol.Request.Params params, SegmentUploader 
segmentUploader) {
+    this(segmentLogger, protocolHandler, indexLoadingConfig, params, 
segmentUploader, null);
+  }
+
+  /**
+   *
+   * @param segmentLogger
+   * @param protocolHandler
+   * @param indexLoadingConfig
+   * @param params
+   * @param segmentUploader
+   * @param defaultSegmentLocation The default segment location uri str, could 
be null.
+   */
+  public SplitSegmentCommitter(Logger segmentLogger, 
ServerSegmentCompletionProtocolHandler protocolHandler,
+      IndexLoadingConfig indexLoadingConfig, 
SegmentCompletionProtocol.Request.Params params, SegmentUploader 
segmentUploader,
+      String defaultSegmentLocation) {

Review comment:
       I think we can clean this up by not having the extra argument, but just 
taking TableConfig instead of indexLoadingConfig in the constructor. We can 
pull from tableconfig  whether the default (always a peer).  Construct the peer 
download URL in this class if the upload fails. For now, you can comment out 
that failure handling code.
   
   Let us float a discussion on how the table should be configured on upload 
failure. You can either do it in doc, or via email or a discsussion in the chat 
channel (and summarize in email).
   
   A few options come to mind, but best to discuss with the team before moving 
forward.
   1. In  the `SegmentValidationAndRetentionConfig` set some variable to 
indicate that we are enabling peer download for segments. The variable can just 
indicate the scheme: e.g. "http".  The rest of the URL can be constructed 
easily since we know the value of the host, port. etc. I prefer this to having 
a boolean and then another one for the scheme somewhere else. Putting it here 
can also help in the case when deep store is down and we want to download 
offline table segments.
   2. Introduce something in the streamConfig section, but that restricts the 
peer download to realtime only.
   
   In the splitSegmentCommitter class, look at the table config to construct 
the URL. 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to