jackjlli commented on code in PR #9295: URL: https://github.com/apache/pinot/pull/9295#discussion_r959022101
########## pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java: ########## @@ -747,6 +751,54 @@ public SimpleHttpResponse uploadSegment(URI uri, String segmentName, InputStream return uploadSegment(uri, segmentName, inputStream, null, parameters, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS); } + /** + * Returns a map from a given tableType to a list of segments for that given tableType (OFFLINE or REALTIME) + * If tableType is left unspecified, both OFFLINE and REALTIME segments will be returned in the map. + */ + public Map<String, List<String>> getSegments(URI uri, String rawTableName, @Nullable String tableType, + boolean excludeReplacedSegments) + throws URISyntaxException, IOException { + List<String> tableTypes; + if (tableType == null || tableType.isEmpty()) { + tableTypes = Arrays.asList(TableType.OFFLINE.toString(), TableType.REALTIME.toString()); + } else { + tableTypes = Arrays.asList(tableType); Review Comment: should we validate the correctness of the table type here if it's non-empty? ########## pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java: ########## @@ -747,6 +751,54 @@ public SimpleHttpResponse uploadSegment(URI uri, String segmentName, InputStream return uploadSegment(uri, segmentName, inputStream, null, parameters, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS); } + /** + * Returns a map from a given tableType to a list of segments for that given tableType (OFFLINE or REALTIME) + * If tableType is left unspecified, both OFFLINE and REALTIME segments will be returned in the map. + */ + public Map<String, List<String>> getSegments(URI uri, String rawTableName, @Nullable String tableType, + boolean excludeReplacedSegments) + throws URISyntaxException, IOException { + List<String> tableTypes; + if (tableType == null || tableType.isEmpty()) { + tableTypes = Arrays.asList(TableType.OFFLINE.toString(), TableType.REALTIME.toString()); + } else { + tableTypes = Arrays.asList(tableType); + } + ControllerRequestURLBuilder controllerRequestURLBuilder = ControllerRequestURLBuilder.baseUrl(uri.toString()); + Map<String, List<String>> tableTypeToSegments = new HashMap<>(); + for (String tableTypeToFilter : tableTypes) { + List<String> segments = new ArrayList<>(); + RequestBuilder requestBuilder = RequestBuilder.get( + controllerRequestURLBuilder.forSegmentListAPIWithTableTypeAndExcludeReplacedSegments(rawTableName, + tableTypeToFilter, excludeReplacedSegments)).setVersion(HttpVersion.HTTP_1_1); + HttpClient.setTimeout(requestBuilder, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS); + SimpleHttpResponse response; + try { + response = HttpClient.wrapAndThrowHttpException(_httpClient.sendRequest(requestBuilder.build())); + } catch (HttpErrorStatusException e) { + tableTypeToSegments.put(tableTypeToFilter, segments); + continue; + } + String responseString = response.getResponse(); + JsonNode responseJsonNode = JsonUtils.stringToJsonNode(responseString); + Iterator<JsonNode> responseElements = responseJsonNode.elements(); + while (responseElements.hasNext()) { Review Comment: Can we encapsulate the logic of fetching the segment names into a method, or is there any existing logic doing that right now? ########## pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java: ########## @@ -747,6 +751,54 @@ public SimpleHttpResponse uploadSegment(URI uri, String segmentName, InputStream return uploadSegment(uri, segmentName, inputStream, null, parameters, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS); } + /** + * Returns a map from a given tableType to a list of segments for that given tableType (OFFLINE or REALTIME) + * If tableType is left unspecified, both OFFLINE and REALTIME segments will be returned in the map. + */ + public Map<String, List<String>> getSegments(URI uri, String rawTableName, @Nullable String tableType, + boolean excludeReplacedSegments) + throws URISyntaxException, IOException { + List<String> tableTypes; + if (tableType == null || tableType.isEmpty()) { + tableTypes = Arrays.asList(TableType.OFFLINE.toString(), TableType.REALTIME.toString()); + } else { + tableTypes = Arrays.asList(tableType); + } + ControllerRequestURLBuilder controllerRequestURLBuilder = ControllerRequestURLBuilder.baseUrl(uri.toString()); + Map<String, List<String>> tableTypeToSegments = new HashMap<>(); + for (String tableTypeToFilter : tableTypes) { + List<String> segments = new ArrayList<>(); + RequestBuilder requestBuilder = RequestBuilder.get( + controllerRequestURLBuilder.forSegmentListAPIWithTableTypeAndExcludeReplacedSegments(rawTableName, + tableTypeToFilter, excludeReplacedSegments)).setVersion(HttpVersion.HTTP_1_1); + HttpClient.setTimeout(requestBuilder, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS); + SimpleHttpResponse response; + try { + response = HttpClient.wrapAndThrowHttpException(_httpClient.sendRequest(requestBuilder.build())); + } catch (HttpErrorStatusException e) { + tableTypeToSegments.put(tableTypeToFilter, segments); Review Comment: It'd be good to print out the exception message here instead of swallowing it silently. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java: ########## @@ -371,4 +372,36 @@ private static File generateSegmentMetadataFile(PinotFS fileSystem, URI tarFileU FileUtils.deleteQuietly(segmentMetadataDir); } } + + public static List<String> getSegmentNames(BatchConfigProperties.SegmentPushType pushMode, + Map<String, String> segmentsUriToTarPathMap) { + List<String> segmentNames = new ArrayList<>(); Review Comment: nit: you can instantiate the array list with the size of the `segmentsUriToTarPathMap` map, so that you don't have to worry about the resizing. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java: ########## @@ -371,4 +372,36 @@ private static File generateSegmentMetadataFile(PinotFS fileSystem, URI tarFileU FileUtils.deleteQuietly(segmentMetadataDir); } } + + public static List<String> getSegmentNames(BatchConfigProperties.SegmentPushType pushMode, + Map<String, String> segmentsUriToTarPathMap) { + List<String> segmentNames = new ArrayList<>(); + if (pushMode.equals(BatchConfigProperties.SegmentPushType.TAR) || pushMode.equals( + BatchConfigProperties.SegmentPushType.METADATA)) { + for (String tarFilePath : segmentsUriToTarPathMap.values()) { + File tarFile = new File(tarFilePath); + String fileName = tarFile.getName(); + Preconditions.checkArgument(fileName.endsWith(Constants.TAR_GZ_FILE_EXT)); + String segmentName = getSegmentNameFromPath(fileName); + segmentNames.add(segmentName); + } + } + if (pushMode.equals(BatchConfigProperties.SegmentPushType.URI)) { + for (String segmentUri : segmentsUriToTarPathMap.keySet()) { + Preconditions.checkArgument(segmentUri.endsWith(Constants.TAR_GZ_FILE_EXT)); + String segmentName = getSegmentNameFromPath(segmentUri); + segmentNames.add(segmentName); + } + } + return segmentNames; + } + + /** + * Obtain segment name given filePath by reading from after the last slash (if present) up to and before the tar + * extension. + */ + public static String getSegmentNameFromPath(String filePath) { + int startIndex = filePath.contains("/") ? filePath.lastIndexOf("/") + 1 : 0; Review Comment: nit: the `lastIndexOf()` method will return -1 if it doesn't exist, so you don't have to check whether the string `contains` "/" in the first place. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org