Jackie-Jiang commented on code in PR #13837:
URL: https://github.com/apache/pinot/pull/13837#discussion_r1739400460


##########
pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java:
##########
@@ -124,12 +135,26 @@ public void init(TableConfig tableConfig, Schema schema, 
Map<String, String> bat
         "batchConfigMaps must contain only 1 BatchConfig for table: %s", 
_tableNameWithType);
 
     Map<String, String> batchConfigMap = 
_batchIngestionConfig.getBatchConfigMaps().get(0);
+    batchConfigMap.put(BatchConfigProperties.UPLOADED_REALTIME_PARTITION_ID, 
Integer.toString(_indexOfSubtask));
+    batchConfigMap.put(BatchConfigProperties.SEGMENT_UPLOAD_TIME_MS, 
String.valueOf(_segmentUploadTimeMs));
+    batchConfigMap.computeIfAbsent(
+        BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX + "." + 
BatchConfigProperties.SEGMENT_NAME_PREFIX,
+        key -> StringUtils.isNotBlank(_segmentNamePrefix) ? _segmentNamePrefix
+            : DEFAULT_UPLOADED_REALTIME_SEGMENT_PREFIX);
+
+    // generate segment name for simple segment name generator type(non upsert 
tables)
     String segmentNamePostfixProp = String.format("%s.%s", 
BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX,
         BatchConfigProperties.SEGMENT_NAME_POSTFIX);
     String segmentSuffix = batchConfigMap.get(segmentNamePostfixProp);
     segmentSuffix = segmentSuffix == null ? String.valueOf(_indexOfSubtask) : 
segmentSuffix + "_" + _indexOfSubtask;
     batchConfigMap.put(segmentNamePostfixProp, segmentSuffix);
 
+    // For upsert tables must use the UploadedRealtimeSegmentName for right 
assignment of segments
+    if (_tableConfig.getTableType().equals(TableType.REALTIME)) {

Review Comment:
   (nit)
   ```suggestion
       if (_tableConfig.getTableType() == TableType.REALTIME) {
   ```



##########
pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java:
##########
@@ -78,11 +84,20 @@ public PinotSinkFunction(PinotGenericRowConverter<T> 
recordConverter, TableConfi
     _executorPoolSize = executorPoolSize;
   }
 
+  public PinotSinkFunction(PinotGenericRowConverter<T> recordConverter, 
TableConfig tableConfig, Schema schema,
+      long segmentFlushMaxNumRecords, int executorPoolSize, String 
segmentNamePrefix,

Review Comment:
   Is `segmentNamePrefix` also nullable?



##########
pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/BatchConfig.java:
##########
@@ -52,7 +52,9 @@ public class BatchConfig {
   private final boolean _excludeSequenceId;
   private final boolean _appendUUIDToSegmentName;
   private final boolean _excludeTimeInSegmentName;
+  private final String _uploadedRealtimePartitionId;

Review Comment:
   Should we directly read it as `int`?



##########
pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/PinotSinkFunction.java:
##########
@@ -78,11 +84,20 @@ public PinotSinkFunction(PinotGenericRowConverter<T> 
recordConverter, TableConfi
     _executorPoolSize = executorPoolSize;
   }
 
+  public PinotSinkFunction(PinotGenericRowConverter<T> recordConverter, 
TableConfig tableConfig, Schema schema,
+      long segmentFlushMaxNumRecords, int executorPoolSize, String 
segmentNamePrefix,

Review Comment:
   Make the other constructor calling this one, and we can change all member 
variables to `final`



##########
pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java:
##########
@@ -124,12 +135,26 @@ public void init(TableConfig tableConfig, Schema schema, 
Map<String, String> bat
         "batchConfigMaps must contain only 1 BatchConfig for table: %s", 
_tableNameWithType);
 
     Map<String, String> batchConfigMap = 
_batchIngestionConfig.getBatchConfigMaps().get(0);
+    batchConfigMap.put(BatchConfigProperties.UPLOADED_REALTIME_PARTITION_ID, 
Integer.toString(_indexOfSubtask));
+    batchConfigMap.put(BatchConfigProperties.SEGMENT_UPLOAD_TIME_MS, 
String.valueOf(_segmentUploadTimeMs));

Review Comment:
   (nit)
   ```suggestion
       batchConfigMap.put(BatchConfigProperties.SEGMENT_UPLOAD_TIME_MS, 
Long.toString(_segmentUploadTimeMs));
   ```



##########
pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java:
##########
@@ -124,12 +135,26 @@ public void init(TableConfig tableConfig, Schema schema, 
Map<String, String> bat
         "batchConfigMaps must contain only 1 BatchConfig for table: %s", 
_tableNameWithType);
 
     Map<String, String> batchConfigMap = 
_batchIngestionConfig.getBatchConfigMaps().get(0);
+    batchConfigMap.put(BatchConfigProperties.UPLOADED_REALTIME_PARTITION_ID, 
Integer.toString(_indexOfSubtask));

Review Comment:
   How do we ensure this value is the correct partition id?



##########
pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java:
##########
@@ -95,8 +100,12 @@ public class FlinkSegmentWriter implements SegmentWriter {
   private transient Counter _processedRecords;
   private transient volatile long _lastRecordProcessingTimeMs = 0;
 
-  public FlinkSegmentWriter(int indexOfSubtask, MetricGroup metricGroup) {
+  public FlinkSegmentWriter(int indexOfSubtask, MetricGroup metricGroup, 
String segmentNamePrefix,

Review Comment:
   Suggest keeping the original constructor for backward compatibility. I think 
`segmentNamePrefix` is also nullable



##########
pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java:
##########
@@ -124,12 +135,26 @@ public void init(TableConfig tableConfig, Schema schema, 
Map<String, String> bat
         "batchConfigMaps must contain only 1 BatchConfig for table: %s", 
_tableNameWithType);
 
     Map<String, String> batchConfigMap = 
_batchIngestionConfig.getBatchConfigMaps().get(0);
+    batchConfigMap.put(BatchConfigProperties.UPLOADED_REALTIME_PARTITION_ID, 
Integer.toString(_indexOfSubtask));
+    batchConfigMap.put(BatchConfigProperties.SEGMENT_UPLOAD_TIME_MS, 
String.valueOf(_segmentUploadTimeMs));
+    batchConfigMap.computeIfAbsent(
+        BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX + "." + 
BatchConfigProperties.SEGMENT_NAME_PREFIX,

Review Comment:
   Should this always come from batch config? This will be a behavior change if 
we always prefix it with `flink`



##########
pinot-connectors/pinot-flink-connector/README.md:
##########
@@ -39,9 +44,34 @@ srcDs.addSink(new PinotSinkFunction<>(new 
PinotRowRecordConverter(TEST_TYPE_INFO
 execEnv.execute();
 ```
 
+## Quick start for realtime(upsert) table backfill
+```java
+// Set up flink env and data source
+StreamExecutionEnvironment execEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
+execEnv.setParallelism(2); // mandatory for upsert tables wi
+DataStream<Row> srcDs = execEnv.fromCollection(data).returns(TEST_TYPE_INFO)
+
+// Create a ControllerRequestClient to fetch Pinot schema and table config
+HttpClient httpClient = HttpClient.getInstance();
+ControllerRequestClient client = new ControllerRequestClient(
+ControllerRequestURLBuilder.baseUrl(DEFAULT_CONTROLLER_URL), httpClient);
+
+// fetch Pinot schema
+Schema schema = PinotConnectionUtils.getSchema(client, "starbucksStores");
+// fetch Pinot table config
+TableConfig tableConfig = PinotConnectionUtils.getTableConfig(client, 
"starbucksStores", "OFFLINE");

Review Comment:
   Should this be real-time?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to