ankitsultana commented on code in PR #13837:
URL: https://github.com/apache/pinot/pull/13837#discussion_r1733264000


##########
pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java:
##########
@@ -185,6 +189,14 @@ public boolean isExcludeSequenceId() {
     return _excludeSequenceId;
   }
 
+  public String getPartitionId() {
+    return _partitionId;
+  }
+
+  public long getSegmentUploadTimeMs() {

Review Comment:
   nit: getter order should match declaration order: _partitionId > sequenceId 
> segmentUploadTimeMs



##########
pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java:
##########
@@ -52,7 +52,9 @@ public class BatchConfig {
   private final boolean _excludeSequenceId;
   private final boolean _appendUUIDToSegmentName;
   private final boolean _excludeTimeInSegmentName;
+  private final String _partitionId;

Review Comment:
   partition is quite overloaded.. this is the "uploadedRealtimePartitionId"?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java:
##########
@@ -169,6 +170,9 @@ private static SegmentNameGenerator 
getSegmentNameGenerator(BatchConfig batchCon
       case BatchConfigProperties.SegmentNameGeneratorType.SIMPLE:
         return new SimpleSegmentNameGenerator(rawTableName, 
batchConfig.getSegmentNamePostfix(),
             batchConfig.isAppendUUIDToSegmentName(), 
batchConfig.isExcludeTimeInSegmentName());
+      case BatchConfigProperties.SegmentNameGeneratorType.UPLOADED_REALTIME:
+        return new UploadedRealtimeSegmentNameGenerator(rawTableName, 
Integer.parseInt(batchConfig.getPartitionId()),

Review Comment:
   Can we throw a graceful error when integer parse fails? The current error 
message will not be that intuitive and users will have to look at the code to 
debug.



##########
pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java:
##########
@@ -95,8 +100,12 @@ public class FlinkSegmentWriter implements SegmentWriter {
   private transient Counter _processedRecords;
   private transient volatile long _lastRecordProcessingTimeMs = 0;
 
-  public FlinkSegmentWriter(int indexOfSubtask, MetricGroup metricGroup) {
+  public FlinkSegmentWriter(int indexOfSubtask, MetricGroup metricGroup, 
String segmentNamePrefix,

Review Comment:
   b/w incompatible change is not a concern here? I guess callers only call the 
sink function?



##########
pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java:
##########
@@ -119,7 +139,7 @@ public void invoke(T value, Context context)
       throws Exception {
     _segmentWriter.collect(_recordConverter.convertToRow(value));
     _segmentNumRecord++;
-    if (_segmentNumRecord > _segmentFlushMaxNumRecords) {
+    if (_segmentNumRecord >= _segmentFlushMaxNumRecords) {

Review Comment:
   curious, why is this required?



##########
pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java:
##########
@@ -76,13 +84,25 @@ public PinotSinkFunction(PinotGenericRowConverter<T> 
recordConverter, TableConfi
     _schema = schema;
     _segmentFlushMaxNumRecords = segmentFlushMaxNumRecords;
     _executorPoolSize = executorPoolSize;
+    _segmentNamePrefix = DEFAULT_UPLOADED_REALTIME_SEGMENT_PREFIX;
+  }
+
+  public PinotSinkFunction(PinotGenericRowConverter<T> recordConverter, 
TableConfig tableConfig, Schema schema,
+      long segmentFlushMaxNumRecords, int executorPoolSize, String 
segmentNamePrefix,
+      @Nullable Long segmentUploadTimeMs) {
+    this(recordConverter, tableConfig, schema, segmentFlushMaxNumRecords, 
executorPoolSize);
+    if (!segmentNamePrefix.isBlank()) {

Review Comment:
   this means we enforce a non-empty segment name prefix? could there be a need 
to support no prefix?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to