This is an automated email from the ASF dual-hosted git repository. jenniferdai pushed a commit to branch rt in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 830e4437e3ee8080af3e9304bdd235f9ceeba30e Author: Jennifer Dai <j...@linkedin.com> AuthorDate: Mon Nov 26 10:38:11 2018 -0800 Adding pluggable storage support for realtime upload --- .../resources/LLCSegmentCompletionHandlers.java | 58 ++++++++++++++++------ 1 file changed, 43 insertions(+), 15 deletions(-) diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java index 38c0dd7..6832b85 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java @@ -18,9 +18,13 @@ package com.linkedin.pinot.controller.api.resources; import com.google.common.annotations.VisibleForTesting; import com.linkedin.pinot.common.protocols.SegmentCompletionProtocol; import com.linkedin.pinot.common.utils.LLCSegmentName; +import com.linkedin.pinot.common.utils.StringUtil; import com.linkedin.pinot.controller.ControllerConf; import com.linkedin.pinot.controller.helix.core.realtime.SegmentCompletionManager; import com.linkedin.pinot.controller.util.SegmentCompletionUtils; +import com.linkedin.pinot.filesystem.LocalPinotFS; +import com.linkedin.pinot.filesystem.PinotFS; +import com.linkedin.pinot.filesystem.PinotFSFactory; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; @@ -39,7 +43,6 @@ import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import org.apache.commons.httpclient.URI; -import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.glassfish.jersey.media.multipart.FormDataBodyPart; import org.glassfish.jersey.media.multipart.FormDataMultiPart; @@ -300,27 +303,48 @@ public class LLCSegmentCompletionHandlers { FormDataBodyPart bodyPart = map.get(name).get(0); FileUploadPathProvider provider = new FileUploadPathProvider(_controllerConf); - File tmpFile = new File(provider.getFileUploadTmpDir(), name + "." + UUID.randomUUID().toString()); - tmpFile.deleteOnExit(); + String tmpFilePath = StringUtil.join("/", provider.getFileUploadTmpDirURI().toString(), name + "." + UUID.randomUUID().toString()); + java.net.URI tmpFileURI = ControllerConf.getUriFromPath(tmpFilePath); + + PinotFS pinotFS = PinotFSFactory.create(tmpFileURI.getScheme()); + + File localTmpFile = new File(provider.getFileUploadTmpDir(), name + "." + UUID.randomUUID().toString()); + localTmpFile.deleteOnExit(); + + // Copy multipart to local try (InputStream inputStream = bodyPart.getValueAs(InputStream.class); - OutputStream outputStream = new FileOutputStream(tmpFile)) { + OutputStream outputStream = new FileOutputStream(localTmpFile)) { IOUtils.copyLarge(inputStream, outputStream); } + // If remote, will need to copy tmp file to remote storage + try { + if ((!(pinotFS instanceof LocalPinotFS))) { + pinotFS.copyFromLocalFile(localTmpFile, tmpFileURI); + } + } catch (Exception e) { + pinotFS.delete(tmpFileURI, true); + LOGGER.error("Could not copy from local to remote storage"); + } + LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName); final String rawTableName = llcSegmentName.getTableName(); - final File tableDir = new File(provider.getBaseDataDir(), rawTableName); - File segmentFile; + final java.net.URI tableDirURI = ControllerConf.getUriFromPath(StringUtil.join("/", provider.getBaseDataDirURI().toString(), rawTableName)); + java.net.URI segmentFileURI; if (isSplitCommit) { String uniqueSegmentFileName = SegmentCompletionUtils.generateSegmentFileName(segmentName); - segmentFile = new File(tableDir, uniqueSegmentFileName); + segmentFileURI = ControllerConf.getUriFromPath(StringUtil.join("/", tableDirURI.toString(), uniqueSegmentFileName)); } else { - segmentFile = new File(tableDir, segmentName); + segmentFileURI = ControllerConf.getUriFromPath(StringUtil.join("/", tableDirURI.toString(), segmentName)); } if (isSplitCommit) { - FileUtils.moveFile(tmpFile, segmentFile); + try { + pinotFS.copyFromLocalFile(localTmpFile, segmentFileURI); + } catch (Exception e) { + LOGGER.error("Could not copy from {} to {}", localTmpFile.getAbsolutePath(), segmentFileURI.toString()); + } } else { // Multiple threads can reach this point at the same time, if the following scenario happens // The server that was asked to commit did so very slowly (due to network speeds). Meanwhile the FSM in @@ -337,16 +361,20 @@ public class LLCSegmentCompletionHandlers { // For now, we live with these corner cases. Once we have split-commit enabled and working, this code will no longer // be used. synchronized (SegmentCompletionManager.getInstance()) { - if (segmentFile.exists()) { - LOGGER.warn("Segment file {} exists. Replacing with upload from {}", segmentFile.getAbsolutePath(), + if (pinotFS.exists(segmentFileURI)) { + LOGGER.warn("Segment file {} exists. Replacing with upload from {}", segmentFileURI.toString(), instanceId); - FileUtils.deleteQuietly(segmentFile); + pinotFS.delete(segmentFileURI, true); + } + try { + pinotFS.copyFromLocalFile(localTmpFile, segmentFileURI); + } catch (Exception e) { + LOGGER.error("Could not copy from {} to {}", localTmpFile.getAbsolutePath(), segmentFileURI.toString()); } - FileUtils.moveFile(tmpFile, segmentFile); } } - LOGGER.info("Moved file {} to {}", tmpFile.getAbsolutePath(), segmentFile.getAbsolutePath()); - return new URI(SCHEME + segmentFile.getAbsolutePath(), /* boolean escaped */ false).toString(); + LOGGER.info("Moved file {} to {}", localTmpFile.getAbsolutePath(), segmentFileURI.toString()); + return new URI(SCHEME + segmentFileURI.toString(), /* boolean escaped */ false).toString(); } catch (InvalidControllerConfigException e) { LOGGER.error("Invalid controller config exception from instance {} for segment {}", instanceId, segmentName, e); return null; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org