This is an automated email from the ASF dual-hosted git repository. jenniferdai pushed a commit to branch deletePush in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 618e475005abe42165e3260241193d87118a39a5 Author: Jennifer Dai <j...@linkedin.com> AuthorDate: Tue Jun 18 10:54:07 2019 -0700 Deleting extra segments after push --- .../common/utils/FileUploadDownloadClient.java | 39 +++++++++++++++ .../apache/pinot/hadoop/job/ControllerRestApi.java | 9 ++++ .../pinot/hadoop/job/DefaultControllerRestApi.java | 57 ++++++++++++++++++++++ .../apache/pinot/hadoop/job/SegmentTarPushJob.java | 7 +++ 4 files changed, 112 insertions(+) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java index de8c76a..4cf3210 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java @@ -26,8 +26,10 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.UnsupportedEncodingException; import java.net.URI; import java.net.URISyntaxException; +import java.net.URLEncoder; import java.util.List; import java.util.Map; import javax.annotation.Nullable; @@ -89,12 +91,14 @@ public class FileUploadDownloadClient implements Closeable { public static final int DEFAULT_SOCKET_TIMEOUT_MS = 600 * 1000; // 10 minutes public static final int GET_REQUEST_SOCKET_TIMEOUT_MS = 5 * 1000; // 5 seconds + public static final int DELETE_REQUEST_SOCKET_TIMEOUT_MS = 10 * 1000; // 10 seconds private static final String HTTP = "http"; private static final String HTTPS = "https"; private static final String SCHEMA_PATH = "/schemas"; private static final String OLD_SEGMENT_PATH = "/segments"; private static final String SEGMENT_PATH = "/v2/segments"; + private static final String DELETE_SEGMENT_PATH = "/segments"; private static final String SEGMENT_METADATA_PATH = "/segmentmetadata"; private static final String TABLES_PATH = "/tables"; private static final String TYPE_DELIMITER = "?type="; @@ -127,6 +131,16 @@ public class FileUploadDownloadClient implements Closeable { return getURI(HTTP, host, port, TABLES_PATH + "/" + rawTableName); } + public static URI getDeleteSegmentHttpUri(String host, int port, String rawTableName, String segmentName, + String tableType) + throws URISyntaxException, UnsupportedEncodingException { + return getURI(HTTP, host, port, DELETE_SEGMENT_PATH + "/" + rawTableName + "/" + URLEncoder.encode(segmentName, "UTF-8") + "?type=" + tableType); + } + + public static URI getRetrieveAllSegmentWithTableTypeHttpUri(String host, int port, String rawTableName, String tableType) throws URISyntaxException { + return getURI(HTTP, host, port, OLD_SEGMENT_PATH + "/" + rawTableName + "?type=" + tableType); + } + public static URI getRetrieveSchemaHttpURI(String host, int port, String schemaName) throws URISyntaxException { return getURI(HTTP, host, port, SCHEMA_PATH + "/" + schemaName); @@ -196,12 +210,32 @@ public class FileUploadDownloadClient implements Closeable { return requestBuilder.build(); } + private static HttpUriRequest getDeleteFileRequest(String method, URI uri, ContentBody contentBody, + @Nullable List<Header> headers, @Nullable List<NameValuePair> parameters, int socketTimeoutMs) { + // Build the Http entity + HttpEntity entity = MultipartEntityBuilder.create().setMode(HttpMultipartMode.BROWSER_COMPATIBLE) + .addPart(contentBody.getFilename(), contentBody).build(); + + // Build the request + RequestBuilder requestBuilder = + RequestBuilder.create(method).setVersion(HttpVersion.HTTP_1_1).setUri(uri).setEntity(entity); + addHeadersAndParameters(requestBuilder, headers, parameters); + setTimeout(requestBuilder, socketTimeoutMs); + return requestBuilder.build(); + } + private static HttpUriRequest constructGetRequest(URI uri) { RequestBuilder requestBuilder = RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1); setTimeout(requestBuilder, GET_REQUEST_SOCKET_TIMEOUT_MS); return requestBuilder.build(); } + private static HttpUriRequest constructDeleteRequest(URI uri) { + RequestBuilder requestBuilder = RequestBuilder.delete(uri).setVersion(HttpVersion.HTTP_1_1); + setTimeout(requestBuilder, DELETE_REQUEST_SOCKET_TIMEOUT_MS); + return requestBuilder.build(); + } + private static HttpUriRequest getAddSchemaRequest(URI uri, String schemaName, File schemaFile) { return getUploadFileRequest(HttpPost.METHOD_NAME, uri, getContentBody(schemaName, schemaFile), null, null, DEFAULT_SOCKET_TIMEOUT_MS); @@ -355,6 +389,11 @@ public class FileUploadDownloadClient implements Closeable { return sendRequest(constructGetRequest(uri)); } + public SimpleHttpResponse sendDeleteRequest(URI uri) + throws IOException, HttpErrorStatusException { + return sendRequest(constructDeleteRequest(uri)); + } + /** * Add schema. * diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java index 319b55b..79f00d5 100644 --- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java +++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/ControllerRestApi.java @@ -35,4 +35,13 @@ public interface ControllerRestApi extends Closeable { void pushSegments(FileSystem fileSystem, List<Path> tarFilePaths); void sendSegmentUris(List<String> segmentUris); + + /** + * Delete extra segments after push during REFRESH use cases. Also used in APPEND use cases where + * a day that has been re-pushed has extra segments. + * @param segmentUris + */ + void deleteExtraSegmentUris(List<String> segmentUris); + + List<String> getAllSegments(String tableType); } diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java index 089ea6e..e60dbfe 100644 --- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java +++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/DefaultControllerRestApi.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -135,8 +136,64 @@ public class DefaultControllerRestApi implements ControllerRestApi { } @Override + public void deleteExtraSegmentUris(List<String> segmentUris) { + LOGGER.info("Start deleting segment URIs: {} to locations: {}", segmentUris, _pushLocations); + for (String segmentUri : segmentUris) { + for (PushLocation pushLocation : _pushLocations) { + LOGGER.info("Sending deleting segment URI: {} to location: {}", segmentUri, pushLocation); + try { + SimpleHttpResponse response = _fileUploadDownloadClient.sendDeleteRequest( + FileUploadDownloadClient.getDeleteSegmentHttpUri(pushLocation.getHost(), pushLocation.getPort(), _rawTableName, + segmentUri, "OFFLINE")); + LOGGER.info("Response {}: {}", response.getStatusCode(), response.getResponse()); + } catch (Exception e) { + LOGGER.error("Caught exception while deleting segment URI: {} to location: {}", segmentUri, pushLocation, e); + throw new RuntimeException(e); + } + } + } + } + + @Override + public List<String> getAllSegments(String tableType) { + LOGGER.info("Getting all segments"); + for (PushLocation pushLocation : _pushLocations) { + try { + SimpleHttpResponse response = _fileUploadDownloadClient.sendGetRequest( + FileUploadDownloadClient.getRetrieveAllSegmentWithTableTypeHttpUri(pushLocation.getHost(), pushLocation.getPort(), + _rawTableName, tableType)); + JsonNode segmentList = getSegmentsFromJsonSegmentAPI(response.getResponse(), tableType); + return segmentList.findValuesAsText(tableType); + } catch (Exception e) { + LOGGER.warn("Caught exception while getting all {} segments for table: {} from push location: {}", tableType, _rawTableName, + pushLocation, e); + } + } + String errorMessage = + String.format("Failed to get a list of all segments from push locations: %s for table: %s", _pushLocations, + _rawTableName); + LOGGER.error(errorMessage); + throw new RuntimeException(errorMessage); + + } + + @Override public void close() throws IOException { _fileUploadDownloadClient.close(); } + + private JsonNode getSegmentsFromJsonSegmentAPI(String json, String type) + throws Exception { + return JsonUtils.stringToJsonNode(json).get(0).get(type); + } + + public static void main (String[] args) { + List<PushLocation> pushLocations = new ArrayList<>(); + PushLocation pushLocation = new PushLocation("lva1-app011", 11984); + pushLocations.add(pushLocation); + + DefaultControllerRestApi defaultControllerRestApi = new DefaultControllerRestApi(pushLocations, "myTable"); + defaultControllerRestApi.getAllSegments("OFFLINE"); + } } diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java index 7c71fd8..e8d650e 100644 --- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java +++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/SegmentTarPushJob.java @@ -31,6 +31,8 @@ public class SegmentTarPushJob extends BaseSegmentJob { private final Path _segmentPattern; private final List<PushLocation> _pushLocations; + private static final String TABLE_TYPE = "OFFLINE"; + public SegmentTarPushJob(Properties properties) { super(properties); _segmentPattern = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_OUTPUT)); @@ -48,7 +50,12 @@ public class SegmentTarPushJob extends BaseSegmentJob { throws Exception { FileSystem fileSystem = FileSystem.get(_conf); try (ControllerRestApi controllerRestApi = getControllerRestApi()) { + List<String> segmentsBeforePush = controllerRestApi.getAllSegments(TABLE_TYPE); controllerRestApi.pushSegments(fileSystem, getDataFilePaths(_segmentPattern)); + List<String> segmentsAfterPush = controllerRestApi.getAllSegments(TABLE_TYPE); + + segmentsBeforePush.removeAll(segmentsAfterPush); + controllerRestApi.deleteExtraSegmentUris(segmentsBeforePush); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org