mcvsubbu commented on a change in pull request #6567:
URL: https://github.com/apache/incubator-pinot/pull/6567#discussion_r606353425



##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
##########
@@ -383,7 +383,12 @@
   public static class Segment {
     public static class Realtime {
       public enum Status {
-        IN_PROGRESS, DONE
+        // Means the segment is not completed and still consuming stream data

Review comment:
       ```suggestion
           // Means the segment is in CONSUMING state
   ```

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
##########
@@ -383,7 +383,12 @@
   public static class Segment {
     public static class Realtime {
       public enum Status {
-        IN_PROGRESS, DONE
+        // Means the segment is not completed and still consuming stream data
+        IN_PROGRESS,
+        // Means the segment is completed and sealed by some Pinot server and 
uploaded
+        DONE,
+        // Means the segment is uploaded to a Pinot controller by an external 
party
+        UPLOADED

Review comment:
       Should we take this opportunity to add two more enums: CONSUMING and 
COMPLETED (these are the terms used elsewhere), and slowly obsolete the 
IN_PROGRESS and DONE. For now, they will be treated equivalent to their 
counterpart.

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
##########
@@ -57,41 +61,44 @@ public ZKOperator(PinotHelixResourceManager 
pinotHelixResourceManager, Controlle
     _controllerMetrics = controllerMetrics;
   }
 
-  public void completeSegmentOperations(String rawTableName, SegmentMetadata 
segmentMetadata,
+  public void completeSegmentOperations(String tableNameWithType, 
SegmentMetadata segmentMetadata,
       URI finalSegmentLocationURI, File currentSegmentLocation, boolean 
enableParallelPushProtection,
       HttpHeaders headers, String zkDownloadURI, boolean 
moveSegmentToFinalLocation, String crypter)
       throws Exception {
-    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
     String segmentName = segmentMetadata.getName();
-
-    // Brand new segment, not refresh, directly add the segment
-    ZNRecord segmentMetadataZnRecord =
-        
_pinotHelixResourceManager.getSegmentMetadataZnRecord(offlineTableName, 
segmentName);
+    ZNRecord segmentMetadataZnRecord = 
_pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, 
segmentName);
     if (segmentMetadataZnRecord == null) {
-      LOGGER.info("Adding new segment {} from table {}", segmentName, 
rawTableName);
+      LOGGER.info("Adding new segment {} from table {}", segmentName, 
tableNameWithType);
       processNewSegment(segmentMetadata, finalSegmentLocationURI, 
currentSegmentLocation, zkDownloadURI, crypter,
-          rawTableName, segmentName, moveSegmentToFinalLocation);
+          tableNameWithType, segmentName, moveSegmentToFinalLocation);
       return;
     }
 
-    LOGGER.info("Segment {} from table {} already exists, refreshing if 
necessary", segmentName, rawTableName);
+    LOGGER.info("Segment {} from table {} already exists, refreshing if 
necessary", segmentName, tableNameWithType);

Review comment:
       Move this log message to below the if statement. Otherwise it is a 
misleading log.

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/SegmentUtils.java
##########
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.util;

Review comment:
       Could this class not be in pinot/common/utils along with classes like 
`LLCSegmentName`

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
##########
@@ -383,7 +383,12 @@
   public static class Segment {
     public static class Realtime {
       public enum Status {
-        IN_PROGRESS, DONE
+        // Means the segment is not completed and still consuming stream data
+        IN_PROGRESS,
+        // Means the segment is completed and sealed by some Pinot server and 
uploaded

Review comment:
       ```suggestion
           // Means the segment is in ONLINE state (segment completed consuming 
and has been saved in segment store).
   ```

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1649,63 +1649,81 @@ public void addNewSegment(String tableName, 
SegmentMetadata segmentMetadata, Str
   public void addNewSegment(String tableName, SegmentMetadata segmentMetadata, 
String downloadUrl,
       @Nullable String crypter) {
     String segmentName = segmentMetadata.getName();
-    String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(tableName);
-
+    String tableNameWithType;
+    InstancePartitionsType instancePartitionsType;
     // NOTE: must first set the segment ZK metadata before assigning segment 
to instances because segment assignment
     // might need them to determine the partition of the segment, and server 
will need them to download the segment
     OfflineSegmentZKMetadata offlineSegmentZKMetadata = new 
OfflineSegmentZKMetadata();
     ZKMetadataUtils.updateSegmentMetadata(offlineSegmentZKMetadata, 
segmentMetadata);
     offlineSegmentZKMetadata.setDownloadUrl(downloadUrl);
     offlineSegmentZKMetadata.setCrypterName(crypter);
     offlineSegmentZKMetadata.setPushTime(System.currentTimeMillis());
+
+    if (isRealtimeOnlyTable(tableName)) {
+      tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+      instancePartitionsType = InstancePartitionsType.CONSUMING;

Review comment:
       I agree with Jackie that it is counter-intuitive. Add a comment here 
reasoning why it should be set to CONSUMING. If we enable uploading realtime 
segments to non-upsert tables, at least we will know why/what we will need to 
fix

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
##########
@@ -247,24 +248,35 @@ private SuccessResponse uploadSegment(@Nullable String 
tableName, FormDataMultiP
         LOGGER.info("Uploading a segment {} to table: {}, push type {}, 
(Derived from segment metadata)", segmentName, tableName, uploadType);
       }
 
-      String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
+      String tableNameWithType;
+      if (tableType == TableType.OFFLINE) {
+        tableNameWithType = 
TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
+      } else {
+        if (!_pinotHelixResourceManager.isUpsertTable(rawTableName)) {
+          throw new UnsupportedOperationException(
+              "Upload segment to non-upsert realtime table is not supported " 
+ rawTableName);
+        }
+        tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(rawTableName);
+      }
+
       String clientAddress = 
InetAddress.getByName(request.getRemoteAddr()).getHostName();
       LOGGER.info("Processing upload request for segment: {} of table: {} from 
client: {}, ingestion descriptor: {}",
-          segmentName, offlineTableName, clientAddress, ingestionDescriptor);
+          segmentName, tableNameWithType, clientAddress, ingestionDescriptor);
 
-      // Skip segment validation if upload only segment metadata
-      if (uploadType != FileUploadDownloadClient.FileUploadType.METADATA) {
+      // Skip segment validation if upload is to an offline table and only 
segment metadata.

Review comment:
       Add some comment on why we dont need validation for realtime segment

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/util/SegmentUtils.java
##########
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.util;
+
+import com.google.common.base.Preconditions;
+import java.util.Set;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.utils.LLCSegmentName;
+
+
+// Util functions related to segments.
+public class SegmentUtils {
+  // Returns the partition id of a realtime segment based segment name and 
segment metadata info retrieved via Helix.
+  public static int getRealtimeSegmentPartitionId(String segmentName, String 
realtimeTableName,

Review comment:
       Add in javadocs saying this method is NOT to be called during query 
processing. I am being a bit paranoid that someone may make a change in the 
broker at some point without realizing what is going on




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to