ankitsultana commented on code in PR #13837: URL: https://github.com/apache/pinot/pull/13837#discussion_r1733264000
########## pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java: ########## @@ -185,6 +189,14 @@ public boolean isExcludeSequenceId() { return _excludeSequenceId; } + public String getPartitionId() { + return _partitionId; + } + + public long getSegmentUploadTimeMs() { Review Comment: nit: getter order should match declaration order: _partitionId > sequenceId > segmentUploadTimeMs ########## pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java: ########## @@ -52,7 +52,9 @@ public class BatchConfig { private final boolean _excludeSequenceId; private final boolean _appendUUIDToSegmentName; private final boolean _excludeTimeInSegmentName; + private final String _partitionId; Review Comment: partition is quite overloaded.. this is the "uploadedRealtimePartitionId"? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java: ########## @@ -169,6 +170,9 @@ private static SegmentNameGenerator getSegmentNameGenerator(BatchConfig batchCon case BatchConfigProperties.SegmentNameGeneratorType.SIMPLE: return new SimpleSegmentNameGenerator(rawTableName, batchConfig.getSegmentNamePostfix(), batchConfig.isAppendUUIDToSegmentName(), batchConfig.isExcludeTimeInSegmentName()); + case BatchConfigProperties.SegmentNameGeneratorType.UPLOADED_REALTIME: + return new UploadedRealtimeSegmentNameGenerator(rawTableName, Integer.parseInt(batchConfig.getPartitionId()), Review Comment: Can we throw a graceful error when integer parse fails? The current error message will not be that intuitive and users will have to look at the code to debug. ########## pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java: ########## @@ -95,8 +100,12 @@ public class FlinkSegmentWriter implements SegmentWriter { private transient Counter _processedRecords; private transient volatile long _lastRecordProcessingTimeMs = 0; - public FlinkSegmentWriter(int indexOfSubtask, MetricGroup metricGroup) { + public FlinkSegmentWriter(int indexOfSubtask, MetricGroup metricGroup, String segmentNamePrefix, Review Comment: b/w incompatible change is not a concern here? I guess callers only call the sink function? ########## pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java: ########## @@ -119,7 +139,7 @@ public void invoke(T value, Context context) throws Exception { _segmentWriter.collect(_recordConverter.convertToRow(value)); _segmentNumRecord++; - if (_segmentNumRecord > _segmentFlushMaxNumRecords) { + if (_segmentNumRecord >= _segmentFlushMaxNumRecords) { Review Comment: curious, why is this required? ########## pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java: ########## @@ -76,13 +84,25 @@ public PinotSinkFunction(PinotGenericRowConverter<T> recordConverter, TableConfi _schema = schema; _segmentFlushMaxNumRecords = segmentFlushMaxNumRecords; _executorPoolSize = executorPoolSize; + _segmentNamePrefix = DEFAULT_UPLOADED_REALTIME_SEGMENT_PREFIX; + } + + public PinotSinkFunction(PinotGenericRowConverter<T> recordConverter, TableConfig tableConfig, Schema schema, + long segmentFlushMaxNumRecords, int executorPoolSize, String segmentNamePrefix, + @Nullable Long segmentUploadTimeMs) { + this(recordConverter, tableConfig, schema, segmentFlushMaxNumRecords, executorPoolSize); + if (!segmentNamePrefix.isBlank()) { Review Comment: this means we enforce a non-empty segment name prefix? could there be a need to support no prefix? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org