snleee commented on a change in pull request #6667:
URL: https://github.com/apache/incubator-pinot/pull/6667#discussion_r602142330



##########
File path: 
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
##########
@@ -53,4 +57,34 @@ default StreamPartitionMsgOffset 
fetchStreamPartitionOffset(@Nonnull OffsetCrite
     long offset = fetchPartitionOffset(offsetCriteria, timeoutMillis);
     return new LongMsgOffset(offset);
   }
+
+  /**
+   * Fetch the list of partition group info for the latest state of the stream.
+   * Default behavior is the one for the Kafka stream, where each partition 
group contains only one partition
+   * @param currentPartitionGroupsMetadata The list of metadata for the 
current partition groups
+   */
+  default List<PartitionGroupInfo> getPartitionGroupInfoList(String clientId, 
StreamConfig streamConfig,
+      List<PartitionGroupMetadata> currentPartitionGroupsMetadata, int 
timeoutMillis)
+      throws TimeoutException, IOException {
+    int partitionCount = fetchPartitionCount(timeoutMillis);
+    List<PartitionGroupInfo> newPartitionGroupInfoList = new 
ArrayList<>(partitionCount);
+
+    // Add a PartitionGroupInfo into the list foreach partition already 
present in current.
+    for (PartitionGroupMetadata currentPartitionGroupMetadata : 
currentPartitionGroupsMetadata) {
+      newPartitionGroupInfoList.add(new 
PartitionGroupInfo(currentPartitionGroupMetadata.getPartitionGroupId(),
+          currentPartitionGroupMetadata.getEndOffset()));

Review comment:
       It looks that we are setting `endOffset` to be the `startOffset` of the 
new partition group. 
   
   Does this mean either `startOffset` or `endOffset` is exclusive? If so, 
please add the documentation accordingly in `PartitionGroupMetadata`

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
##########
@@ -190,10 +204,10 @@ public String getUrl(String hostPort, String protocol) {
           + (_params.getSegmentSizeBytes() <= 0 ? ""
           : ("&" + PARAM_SEGMENT_SIZE_BYTES + "=" + 
_params.getSegmentSizeBytes())) + (_params.getNumRows() <= 0 ? ""
           : ("&" + PARAM_ROW_COUNT + "=" + _params.getNumRows())) + 
(_params.getSegmentLocation() == null ? ""
-          : ("&" + PARAM_SEGMENT_LOCATION + "=" + 
_params.getSegmentLocation()))
-          + (_params.getStreamPartitionMsgOffset() == null ? ""
-          : ("&" + PARAM_STREAM_PARTITION_MSG_OFFSET + "=" + 
_params.getStreamPartitionMsgOffset()))
-          ;
+          : ("&" + PARAM_SEGMENT_LOCATION + "=" + 
_params.getSegmentLocation())) + (

Review comment:
       Can we improve this by using some URI builder? I understand that we have 
been building URL in this way before this PR but it would be great to clean up 
since we are touching this part.
   
   It will be cleaner if we keep adding parameters to `List<NameValuePair>` and 
use `new URI(...)` and then convert back to String?

##########
File path: 
pinot-common/src/main/java/org/apache/pinot/common/protocols/SegmentCompletionProtocol.java
##########
@@ -180,6 +184,16 @@ private Request(Params params, String msgType) {
     }
 
     public String getUrl(String hostPort, String protocol) {
+      String streamPartitionMsgOffset;
+      try {
+        streamPartitionMsgOffset = _params.getStreamPartitionMsgOffset() == 
null ? null :
+            URLEncoder.encode(_params.getStreamPartitionMsgOffset(), 
StandardCharsets.UTF_8.toString());

Review comment:
       How do we deal with the segment name parameter? What happens if we 
assign some string value as a segment name that will be broken on the receiver 
side without going through the URL encoding?
   
   IMO, we should encode to URL format for everything. Whatever value provided 
to the `_params`, the same value should be read by the component that will 
parse this request.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to