Jackie-Jiang commented on code in PR #13837: URL: https://github.com/apache/pinot/pull/13837#discussion_r1739400460
########## pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java: ########## @@ -124,12 +135,26 @@ public void init(TableConfig tableConfig, Schema schema, Map<String, String> bat "batchConfigMaps must contain only 1 BatchConfig for table: %s", _tableNameWithType); Map<String, String> batchConfigMap = _batchIngestionConfig.getBatchConfigMaps().get(0); + batchConfigMap.put(BatchConfigProperties.UPLOADED_REALTIME_PARTITION_ID, Integer.toString(_indexOfSubtask)); + batchConfigMap.put(BatchConfigProperties.SEGMENT_UPLOAD_TIME_MS, String.valueOf(_segmentUploadTimeMs)); + batchConfigMap.computeIfAbsent( + BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX + "." + BatchConfigProperties.SEGMENT_NAME_PREFIX, + key -> StringUtils.isNotBlank(_segmentNamePrefix) ? _segmentNamePrefix + : DEFAULT_UPLOADED_REALTIME_SEGMENT_PREFIX); + + // generate segment name for simple segment name generator type(non upsert tables) String segmentNamePostfixProp = String.format("%s.%s", BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX, BatchConfigProperties.SEGMENT_NAME_POSTFIX); String segmentSuffix = batchConfigMap.get(segmentNamePostfixProp); segmentSuffix = segmentSuffix == null ? String.valueOf(_indexOfSubtask) : segmentSuffix + "_" + _indexOfSubtask; batchConfigMap.put(segmentNamePostfixProp, segmentSuffix); + // For upsert tables must use the UploadedRealtimeSegmentName for right assignment of segments + if (_tableConfig.getTableType().equals(TableType.REALTIME)) { Review Comment: (nit) ```suggestion if (_tableConfig.getTableType() == TableType.REALTIME) { ``` ########## pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java: ########## @@ -78,11 +84,20 @@ public PinotSinkFunction(PinotGenericRowConverter<T> recordConverter, TableConfi _executorPoolSize = executorPoolSize; } + public PinotSinkFunction(PinotGenericRowConverter<T> recordConverter, TableConfig tableConfig, Schema schema, + long segmentFlushMaxNumRecords, int executorPoolSize, String segmentNamePrefix, Review Comment: Is `segmentNamePrefix` also nullable? ########## pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java: ########## @@ -52,7 +52,9 @@ public class BatchConfig { private final boolean _excludeSequenceId; private final boolean _appendUUIDToSegmentName; private final boolean _excludeTimeInSegmentName; + private final String _uploadedRealtimePartitionId; Review Comment: Should we directly read it as `int`? ########## pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java: ########## @@ -78,11 +84,20 @@ public PinotSinkFunction(PinotGenericRowConverter<T> recordConverter, TableConfi _executorPoolSize = executorPoolSize; } + public PinotSinkFunction(PinotGenericRowConverter<T> recordConverter, TableConfig tableConfig, Schema schema, + long segmentFlushMaxNumRecords, int executorPoolSize, String segmentNamePrefix, Review Comment: Make the other constructor calling this one, and we can change all member variables to `final` ########## pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java: ########## @@ -124,12 +135,26 @@ public void init(TableConfig tableConfig, Schema schema, Map<String, String> bat "batchConfigMaps must contain only 1 BatchConfig for table: %s", _tableNameWithType); Map<String, String> batchConfigMap = _batchIngestionConfig.getBatchConfigMaps().get(0); + batchConfigMap.put(BatchConfigProperties.UPLOADED_REALTIME_PARTITION_ID, Integer.toString(_indexOfSubtask)); + batchConfigMap.put(BatchConfigProperties.SEGMENT_UPLOAD_TIME_MS, String.valueOf(_segmentUploadTimeMs)); Review Comment: (nit) ```suggestion batchConfigMap.put(BatchConfigProperties.SEGMENT_UPLOAD_TIME_MS, Long.toString(_segmentUploadTimeMs)); ``` ########## pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java: ########## @@ -124,12 +135,26 @@ public void init(TableConfig tableConfig, Schema schema, Map<String, String> bat "batchConfigMaps must contain only 1 BatchConfig for table: %s", _tableNameWithType); Map<String, String> batchConfigMap = _batchIngestionConfig.getBatchConfigMaps().get(0); + batchConfigMap.put(BatchConfigProperties.UPLOADED_REALTIME_PARTITION_ID, Integer.toString(_indexOfSubtask)); Review Comment: How do we ensure this value is the correct partition id? ########## pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java: ########## @@ -95,8 +100,12 @@ public class FlinkSegmentWriter implements SegmentWriter { private transient Counter _processedRecords; private transient volatile long _lastRecordProcessingTimeMs = 0; - public FlinkSegmentWriter(int indexOfSubtask, MetricGroup metricGroup) { + public FlinkSegmentWriter(int indexOfSubtask, MetricGroup metricGroup, String segmentNamePrefix, Review Comment: Suggest keeping the original constructor for backward compatibility. I think `segmentNamePrefix` is also nullable ########## pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java: ########## @@ -124,12 +135,26 @@ public void init(TableConfig tableConfig, Schema schema, Map<String, String> bat "batchConfigMaps must contain only 1 BatchConfig for table: %s", _tableNameWithType); Map<String, String> batchConfigMap = _batchIngestionConfig.getBatchConfigMaps().get(0); + batchConfigMap.put(BatchConfigProperties.UPLOADED_REALTIME_PARTITION_ID, Integer.toString(_indexOfSubtask)); + batchConfigMap.put(BatchConfigProperties.SEGMENT_UPLOAD_TIME_MS, String.valueOf(_segmentUploadTimeMs)); + batchConfigMap.computeIfAbsent( + BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX + "." + BatchConfigProperties.SEGMENT_NAME_PREFIX, Review Comment: Should this always come from batch config? This will be a behavior change if we always prefix it with `flink` ########## pinot-connectors/pinot-flink-connector/README.md: ########## @@ -39,9 +44,34 @@ srcDs.addSink(new PinotSinkFunction<>(new PinotRowRecordConverter(TEST_TYPE_INFO execEnv.execute(); ``` +## Quick start for realtime(upsert) table backfill +```java +// Set up flink env and data source +StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); +execEnv.setParallelism(2); // mandatory for upsert tables wi +DataStream<Row> srcDs = execEnv.fromCollection(data).returns(TEST_TYPE_INFO) + +// Create a ControllerRequestClient to fetch Pinot schema and table config +HttpClient httpClient = HttpClient.getInstance(); +ControllerRequestClient client = new ControllerRequestClient( +ControllerRequestURLBuilder.baseUrl(DEFAULT_CONTROLLER_URL), httpClient); + +// fetch Pinot schema +Schema schema = PinotConnectionUtils.getSchema(client, "starbucksStores"); +// fetch Pinot table config +TableConfig tableConfig = PinotConnectionUtils.getTableConfig(client, "starbucksStores", "OFFLINE"); Review Comment: Should this be real-time? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org