klsince commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1630365372


##########
pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java:
##########
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+
+/**
+ * Class to represent segment names like: 
{prefix}__{tableName}__{partitionId}__{creationTime}__{suffix}
+ *
+ * <p>This naming convention is adopted to represent a segment uploaded to a 
realtime table. The naming
+ * convention has been kept semantically similar to {@link LLCSegmentName} but 
differs in following ways:
+ *
+ * <li> prefix to quickly identify the type/source of segment e.g. 
"uploaded"/"minion"
+ * <li> tableName to be same as the table name of segment
+ * <li> partitionId to identify the right parition for upsert table segment 
table assignment.
+ * <li> creationTime creation time of segment of the format yyyyMMdd'T'HHmm'Z'
+ * <li> suffix to deduplicate segment names created at the same time
+ *
+ * Use {@link 
org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator} 
to generate segment names.
+ */
+public class UploadedRealtimeSegmentName implements 
Comparable<UploadedRealtimeSegmentName> {
+
+  private static final String SEPARATOR = "__";
+  private static final String DATE_FORMAT = "yyyyMMdd'T'HHmm'Z'";
+  private static final DateTimeFormatter DATE_FORMATTER = 
DateTimeFormat.forPattern(DATE_FORMAT).withZoneUTC();
+  private final String _prefix;
+  private final String _tableName;
+  private final int _partitionId;
+  private final String _creationTime;
+  private final String _segmentName;
+  private final String _suffix;
+
+  public UploadedRealtimeSegmentName(String segmentName) {
+
+    // split the segment name by the separator and get creation time, sequence 
id, partition id and table name from
+    // the end and validate segment name starts with prefix uploaded_
+    try {
+      String[] parts = StringUtils.splitByWholeSeparator(segmentName, 
SEPARATOR);
+      Preconditions.checkState(parts.length == 5,
+          "Uploaded segment name must be of the format 
{prefix}__{tableName}__{partitionId}__{creationTime}__{suffix}");
+      _prefix = parts[0];
+      _tableName = parts[1];
+      _partitionId = Integer.parseInt(parts[2]);
+      _creationTime = parts[3];
+      _suffix = parts[4];
+      _segmentName = segmentName;
+    } catch (NumberFormatException e) {
+      throw new IllegalArgumentException("Invalid segment name: " + 
segmentName, e);
+    }
+  }
+
+  /**
+   * Constructor for UploadedRealtimeSegmentName.
+   * @param tableName
+   * @param partitionId
+   * @param msSinceEpoch
+   * @param prefix
+   * @param suffix
+   */
+  public UploadedRealtimeSegmentName(String tableName, int partitionId, long 
msSinceEpoch, String prefix,
+      String suffix) {
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(tableName) && StringUtils.isNotBlank(prefix) && 
StringUtils.isNotBlank(suffix),

Review Comment:
   as in LLCSegmentName, we can also check tableName/suffix/prefix doesn't 
contains(SEPARATOR)



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/UploadedRealtimeSegmentName.java:
##########
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+
+/**
+ * Class to represent segment names like: 
{prefix}__{tableName}__{partitionId}__{creationTime}__{suffix}
+ *
+ * <p>This naming convention is adopted to represent a segment uploaded to a 
realtime table. The naming
+ * convention has been kept semantically similar to {@link LLCSegmentName} but 
differs in following ways:
+ *
+ * <li> prefix to quickly identify the type/source of segment e.g. 
"uploaded"/"minion"
+ * <li> tableName to be same as the table name of segment
+ * <li> partitionId to identify the right parition for upsert table segment 
table assignment.
+ * <li> creationTime creation time of segment of the format yyyyMMdd'T'HHmm'Z'
+ * <li> suffix to deduplicate segment names created at the same time
+ *
+ * Use {@link 
org.apache.pinot.segment.spi.creator.name.UploadedRealtimeSegmentNameGenerator} 
to generate segment names.
+ */
+public class UploadedRealtimeSegmentName implements 
Comparable<UploadedRealtimeSegmentName> {
+
+  private static final String SEPARATOR = "__";
+  private static final String DATE_FORMAT = "yyyyMMdd'T'HHmm'Z'";
+  private static final DateTimeFormatter DATE_FORMATTER = 
DateTimeFormat.forPattern(DATE_FORMAT).withZoneUTC();
+  private final String _prefix;
+  private final String _tableName;
+  private final int _partitionId;
+  private final String _creationTime;
+  private final String _segmentName;
+  private final String _suffix;
+
+  public UploadedRealtimeSegmentName(String segmentName) {
+
+    // split the segment name by the separator and get creation time, sequence 
id, partition id and table name from
+    // the end and validate segment name starts with prefix uploaded_
+    try {
+      String[] parts = StringUtils.splitByWholeSeparator(segmentName, 
SEPARATOR);
+      Preconditions.checkState(parts.length == 5,
+          "Uploaded segment name must be of the format 
{prefix}__{tableName}__{partitionId}__{creationTime}__{suffix}");
+      _prefix = parts[0];
+      _tableName = parts[1];
+      _partitionId = Integer.parseInt(parts[2]);
+      _creationTime = parts[3];
+      _suffix = parts[4];
+      _segmentName = segmentName;
+    } catch (NumberFormatException e) {
+      throw new IllegalArgumentException("Invalid segment name: " + 
segmentName, e);
+    }
+  }
+
+  /**
+   * Constructor for UploadedRealtimeSegmentName.
+   * @param tableName
+   * @param partitionId
+   * @param msSinceEpoch
+   * @param prefix
+   * @param suffix
+   */
+  public UploadedRealtimeSegmentName(String tableName, int partitionId, long 
msSinceEpoch, String prefix,
+      String suffix) {
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(tableName) && StringUtils.isNotBlank(prefix) && 
StringUtils.isNotBlank(suffix),
+        "tableName, prefix and suffix must be non-null and non-empty");
+    _tableName = tableName;
+    _partitionId = partitionId;
+    _creationTime = DATE_FORMATTER.print(msSinceEpoch);
+    _prefix = prefix;
+    _suffix = suffix;
+    _segmentName = Joiner.on(SEPARATOR).join(prefix, tableName, partitionId, 
_creationTime, suffix);
+  }
+
+  /**
+   * Returns true if the segment name is of the format: 
{prefix}__{tableName}__{partitionId}__{creationTime}__{suffix}
+   * @param segmentName
+   * @return boolean true if the segment name is of the format: 
{prefix}__{tableName}__{partitionId}__{creationTime}
+   * __{suffix}
+   */
+  public static boolean isUploadedRealtimeSegmentName(String segmentName) {
+    int numSeparators = 0;
+    int index = 0;
+    while ((index = segmentName.indexOf(SEPARATOR, index)) != -1) {
+      numSeparators++;
+      index += 2; // SEPARATOR.length()
+    }
+    return numSeparators == 4;
+  }
+
+  @Nullable
+  public static UploadedRealtimeSegmentName of(String segmentName) {
+    try {
+      return new UploadedRealtimeSegmentName(segmentName);
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
+  public String getTableName() {
+    return _tableName;
+  }
+
+  public int getPartitionId() {
+    return _partitionId;
+  }
+
+  /**
+   * Returns the creation time in the format yyyyMMdd'T'HHmm'Z'
+   * To be used for only human readability and not for any computation
+   * @return
+   */
+  public String getCreationTime() {
+    return _creationTime;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
+  @Nullable

Review Comment:
   remove this annotation? as we enforce suffix now



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/UploadedRealtimeSegmentNameGenerator.java:
##########
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator.name;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+
+
+/**
+ * Implementation for generating segment names of the format 
UploadedRealtimeSegmentName:
+ * 
uploaded__{tableName}__{partitionId}__{sequenceId}__{creationTime}__{optionalSuffix}
+ *
+ * <p>This naming convention is adopted to represent uploaded segments to a 
realtime table. The semantic is similar
+ * to LLCSegmentName. Scenarios where this naming convention can be preferred 
is:
+ * <li> Generating segments from a batch workload
+ * <li> Minion based segment transformations
+ */
+public class UploadedRealtimeSegmentNameGenerator implements 
SegmentNameGenerator {
+
+  private static final String DELIMITER = "__";
+  private final String _tableName;
+  private final int _partitionId;
+  // creation time must be in long and milliseconds since epoch to be 
consistent with creation.meta time for valid
+  // comparison in segment replace flow.
+  private final long _creationTimeMillis;
+  private final String _prefix;
+
+  // if suffix is not set then sequenceId is used as segment name suffix
+  @Nullable
+  private final String _suffix;
+
+  /**
+   * Creates a UploadedRealtimeSegmentNameGenerator
+   * @param tableName
+   * @param partitionId
+   * @param creationTimeMillis
+   * @param prefix
+   * @param suffix optional field for generator, if not specified then 
sequenceId is used as suffix
+   */
+  public UploadedRealtimeSegmentNameGenerator(String tableName, int 
partitionId, long creationTimeMillis,
+      @Nullable String prefix, String suffix) {
+    Preconditions.checkArgument(
+        creationTimeMillis > 0 && StringUtils.isNotBlank(tableName) && 
StringUtils.isNotBlank(prefix),
+        "Invalid arguments for UploadedRealtimeSegmentNameGenerator");
+    Preconditions.checkState(creationTimeMillis > 0, "Creation time must be 
positive");
+    Preconditions.checkNotNull(tableName, "Table name cannot be null");
+    _tableName = tableName;
+    _partitionId = partitionId;
+    _creationTimeMillis = creationTimeMillis;
+    _prefix = prefix;
+    _suffix = suffix;
+  }
+
+  @Override
+  public String generateSegmentName(int sequenceId, @Nullable Object 
minTimeValue, @Nullable Object maxTimeValue) {
+    return Joiner.on(DELIMITER).skipNulls().join(_prefix, _tableName, 
_partitionId, _creationTimeMillis,

Review Comment:
   no need for `.skipNulls()`? 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -158,6 +158,45 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl 
segment, ThreadSafeMutab
     }
   }
 
+  /**
+   * <li> When the replacing segment and current segment are of {@link 
LLCSegmentName} then the PK should resolve to
+   * row in segment with higher sequence id.
+   * <li> When the replacing segment and current segment are of {@link 
UploadedRealtimeSegmentName} then the PK
+   * should resolve to row in segment with higher creation time
+   * <li> For other cases resolve based on creation time of segment. In case 
the creation time is same, give
+   * preference to an uploaded segment. A segment which is not LLCSegment can 
be assumed to be uploaded segment and
+   * is given preference.
+   *
+   * @param segmentName replacing segment name
+   * @param currentSegmentName current segment name having the record for the 
given primary key
+   * @param segmentCreationTimeMs replacing segment creation time
+   * @param currentSegmentCreationTimeMs current segment creation time
+   * @return true if the record in replacing segment should replace the record 
in current segment
+   */
+  protected boolean shouldReplaceOnComparisonTie(String segmentName, String 
currentSegmentName,
+      long segmentCreationTimeMs, long currentSegmentCreationTimeMs) {
+
+    LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+    LLCSegmentName currentLLCSegmentName = 
LLCSegmentName.of(currentSegmentName);
+    if (llcSegmentName != null && currentLLCSegmentName != null) {
+      return llcSegmentName.getSequenceNumber() > 
currentLLCSegmentName.getSequenceNumber();
+    }
+
+    int creationTimeComparisonRes = Long.compare(segmentCreationTimeMs, 
currentSegmentCreationTimeMs);

Review Comment:
   looks like we can do
   ```
   ...
   if (llcSegmentName != null && currentLLCSegmentName != null) {
     return llcSegmentName.getSequenceNumber() > 
currentLLCSegmentName.getSequenceNumber();
   }
   // If any one is not LLC, then compare ctime
   // likely ctime is not 0 so we save parsing cost of 
UploadedRealtimeSegmentName
   if (creationTimeComparisonRes != 0) {
     return creationTimeComparisonRes > 0;
   }
   // their ctimes are same, then favor uploaded segment, no need to parse just 
yet
   if (currentLLCSegmentName != null) {
     return  true; 
   }
   
   // both segments are uploaded, then prefer a well formatted name
   // also favor the first writer in case both are well formatted
   if (UploadedRealtimeSegmentName.of(currentSegmentName) != null) {
     return false;
   }
   if (UploadedRealtimeSegmentName.of(currentSegmentName) != null) {
     return true;
   }
   // favor the first writer
   return false;
   ```
   wdyt?



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/UploadedRealtimeSegmentNameGenerator.java:
##########
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator.name;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+
+
+/**
+ * Implementation for generating segment names of the format 
UploadedRealtimeSegmentName:
+ * 
uploaded__{tableName}__{partitionId}__{sequenceId}__{creationTime}__{optionalSuffix}

Review Comment:
   forgot to change this comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to