This is an automated email from the ASF dual-hosted git repository. jongyoul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 239f8bf8b4 [ZEPPELIN-5655] OSSNotebookRepo support version control. (#4308) 239f8bf8b4 is described below commit 239f8bf8b409c6d94d9e38643e4051d685af6b5b Author: Guanhua Li <guanhua...@foxmail.com> AuthorDate: Mon Apr 25 11:55:15 2022 +0800 [ZEPPELIN-5655] OSSNotebookRepo support version control. (#4308) * OSSNotebookRepo support version control. * format * OSSNotebokRepo remove dependenc of ossclient * Some optimization * add test * ADD MOCK service to mock OSSOperator in localFileSystem * fix bug of rootFolder path in OSSNotebookRepo * Update for reviews. * Insert the license header into MockStorageOperator.java and RemoteStorageOperator.java. * update doc * configuration to disable version control in OSSNoteBookRepo * update for reviews * Update zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/OSSNotebookRepo.java To key a naming convention. Co-authored-by: Jongyoul Lee <jongy...@gmail.com> Co-authored-by: Jongyoul Lee <jongy...@gmail.com> --- docs/setup/storage/storage.md | 10 + .../zeppelin/conf/ZeppelinConfiguration.java | 5 + .../zeppelin/notebook/repo/OSSNotebookRepo.java | 290 +++++++++++++-------- .../zeppelin/notebook/repo/RevisionsInfo.java | 39 +++ .../notebook/repo/storage/OSSOperator.java | 153 +++++++++++ .../repo/storage/RemoteStorageOperator.java | 48 ++++ .../notebook/repo/MockStorageOperator.java | 122 +++++++++ .../notebook/repo/OSSNotebookRepoTest.java | 243 +++++++++++++++++ 8 files changed, 799 insertions(+), 111 deletions(-) diff --git a/docs/setup/storage/storage.md b/docs/setup/storage/storage.md index dc85cba219..bbc9583728 100644 --- a/docs/setup/storage/storage.md +++ b/docs/setup/storage/storage.md @@ -454,6 +454,16 @@ And you should configure oss related properties in file **zeppelin-site.xml**. <description>Access key secret for your OSS account</description> </property> +<property> + <name>zeppelin.notebook.oss.version.max</name> + <value>30</value> + <description> + Max num of note versions in OSSNoteBookRepo. + It's not mandatory, the default value is 30. + A value of 0 means that version control in OSSNoteBookRepo is disabled. + </description> +</property> + ``` Uncomment the next property for use OSSNotebookRepo class: diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 8021e20ad5..96a4b10de8 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -500,6 +500,10 @@ public class ZeppelinConfiguration { return getString(ConfVars.ZEPPELIN_NOTEBOOK_OSS_ACCESSKEYSECRET); } + public int getOSSNoteMaxVersionNum(){ + return getInt(ConfVars.ZEPPELIN_NOTEBOOK_OSS_VERSION_MAX); + } + public String getMongoUri() { return getString(ConfVars.ZEPPELIN_NOTEBOOK_MONGO_URI); } @@ -973,6 +977,7 @@ public class ZeppelinConfiguration { ZEPPELIN_NOTEBOOK_OSS_ENDPOINT("zeppelin.notebook.oss.endpoint", "http://oss-cn-hangzhou.aliyuncs.com"), ZEPPELIN_NOTEBOOK_OSS_ACCESSKEYID("zeppelin.notebook.oss.accesskeyid", null), ZEPPELIN_NOTEBOOK_OSS_ACCESSKEYSECRET("zeppelin.notebook.oss.accesskeysecret", null), + ZEPPELIN_NOTEBOOK_OSS_VERSION_MAX("zeppelin.notebook.oss.version.max", 30), ZEPPELIN_NOTEBOOK_AZURE_CONNECTION_STRING("zeppelin.notebook.azure.connectionString", null), ZEPPELIN_NOTEBOOK_AZURE_SHARE("zeppelin.notebook.azure.share", "zeppelin"), ZEPPELIN_NOTEBOOK_AZURE_USER("zeppelin.notebook.azure.user", "user"), diff --git a/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/OSSNotebookRepo.java b/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/OSSNotebookRepo.java index 81096dbc6f..5fddf9904b 100644 --- a/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/OSSNotebookRepo.java +++ b/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/OSSNotebookRepo.java @@ -17,43 +17,33 @@ package org.apache.zeppelin.notebook.repo; -import com.aliyun.oss.OSS; -import com.aliyun.oss.OSSClientBuilder; -import com.aliyun.oss.model.CopyObjectRequest; -import com.aliyun.oss.model.DeleteObjectsRequest; -import com.aliyun.oss.model.ListObjectsRequest; -import com.aliyun.oss.model.OSSObject; -import com.aliyun.oss.model.OSSObjectSummary; -import com.aliyun.oss.model.ObjectListing; -import com.aliyun.oss.model.PutObjectRequest; -import org.apache.commons.io.IOUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.notebook.Note; import org.apache.zeppelin.notebook.NoteInfo; +import org.apache.zeppelin.notebook.repo.storage.OSSOperator; +import org.apache.zeppelin.notebook.repo.storage.RemoteStorageOperator; import org.apache.zeppelin.user.AuthenticationInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.ByteArrayInputStream; +import java.io.File; import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; /** * NotebookRepo for Aliyun OSS (https://cn.aliyun.com/product/oss) */ -public class OSSNotebookRepo implements NotebookRepo { +public class OSSNotebookRepo implements NotebookRepoWithVersionControl { private static final Logger LOGGER = LoggerFactory.getLogger(OSSNotebookRepo.class); - private OSS ossClient; private String bucketName; private String rootFolder; + private int maxVersionNumber; + + // Use ossOperator instead of ossClient directly + private RemoteStorageOperator ossOperator; public OSSNotebookRepo() { } @@ -63,144 +53,132 @@ public class OSSNotebookRepo implements NotebookRepo { String endpoint = conf.getOSSEndpoint(); bucketName = conf.getOSSBucketName(); rootFolder = conf.getNotebookDir(); - // rootFolder is part of OSS key which doesn't start with '/' - if (rootFolder.startsWith("/")) { - rootFolder = rootFolder.substring(1); - } + maxVersionNumber = conf.getOSSNoteMaxVersionNum(); + // rootFolder is part of OSS key + rootFolder = formatPath(rootFolder); String accessKeyId = conf.getOSSAccessKeyId(); String accessKeySecret = conf.getOSSAccessKeySecret(); - this.ossClient = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret); + this.ossOperator = new OSSOperator(endpoint, accessKeyId, accessKeySecret); + } + + private static String formatPath(String path) { + // The path should not start with '/' or './' or './/' + // because it is not accepted by OSS service. + if (path.startsWith("/")) { + path = path.substring(1); + } + path = new File(path).getPath(); + if (path.startsWith("./")) { + path = path.substring(2); + } + return path; + } + + public void setOssOperator(RemoteStorageOperator ossOperator) { + this.ossOperator = ossOperator; } @Override public Map<String, NoteInfo> list(AuthenticationInfo subject) throws IOException { Map<String, NoteInfo> notesInfo = new HashMap<>(); - final int maxKeys = 200; - String nextMarker = null; - ObjectListing objectListing = null; - do { - objectListing = ossClient.listObjects( - new ListObjectsRequest(bucketName) - .withPrefix(rootFolder + "/") - .withMarker(nextMarker) - .withMaxKeys(maxKeys)); - List<OSSObjectSummary> sums = objectListing.getObjectSummaries(); - for (OSSObjectSummary s : sums) { - if (s.getKey().endsWith(".zpln")) { - try { - String noteId = getNoteId(s.getKey()); - String notePath = getNotePath(rootFolder, s.getKey()); - notesInfo.put(noteId, new NoteInfo(noteId, notePath)); - } catch (IOException e) { - LOGGER.warn(e.getMessage()); - } - } else { - LOGGER.debug("Skip invalid note file: {}", s.getKey()); + List<String> objectKeys = ossOperator.listDirObjects(bucketName, rootFolder + "/"); + for (String key : objectKeys) { + if (key.endsWith(".zpln")) { + try { + String noteId = getNoteId(key); + String notePath = getNotePath(rootFolder, key); + notesInfo.put(noteId, new NoteInfo(noteId, notePath)); + } catch (IOException e) { + LOGGER.warn(e.getMessage()); } + } else { + LOGGER.debug("Skip invalid note file: {}", key); } - nextMarker = objectListing.getNextMarker(); - } while (objectListing.isTruncated()); - + } return notesInfo; } + public Note getByOSSPath(String noteId, String ossPath) throws IOException { + String noteText = ossOperator.getTextObject(bucketName, ossPath); + return Note.fromJson(noteId, noteText); + } + + @Override public Note get(String noteId, String notePath, AuthenticationInfo subject) throws IOException { - OSSObject ossObject = ossClient.getObject(bucketName, - rootFolder + "/" + buildNoteFileName(noteId, notePath)); - InputStream in = null; - try { - in = ossObject.getObjectContent(); - return Note.fromJson(noteId, IOUtils.toString(in, StandardCharsets.UTF_8)); - } finally { - if (in != null) { - in.close(); - } - } + return getByOSSPath(noteId, rootFolder + "/" + buildNoteFileName(noteId, notePath)); } @Override public void save(Note note, AuthenticationInfo subject) throws IOException { String content = note.toJson(); - PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, + ossOperator.putTextObject(bucketName, rootFolder + "/" + buildNoteFileName(note.getId(), note.getPath()), new ByteArrayInputStream(content.getBytes())); - ossClient.putObject(putObjectRequest); } @Override public void move(String noteId, String notePath, String newNotePath, AuthenticationInfo subject) throws IOException { - String sourceKey = rootFolder + "/" + buildNoteFileName(noteId, notePath); - String destKey = rootFolder + "/" + buildNoteFileName(noteId, newNotePath); - CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucketName, - sourceKey, bucketName, destKey); - ossClient.copyObject(copyObjectRequest); - ossClient.deleteObject(bucketName, sourceKey); + String noteSourceKey = rootFolder + "/" + buildNoteFileName(noteId, notePath); + String noteDestKey = rootFolder + "/" + buildNoteFileName(noteId, newNotePath); + ossOperator.moveObject(bucketName, noteSourceKey, noteDestKey); + String revisionSourceDirKey = rootFolder + "/" + buildRevisionsDirName(noteId, notePath); + String revisionDestDirKey = rootFolder + "/" + buildRevisionsDirName(noteId, newNotePath); + ossOperator.moveDir(bucketName, revisionSourceDirKey, revisionDestDirKey); } @Override public void move(String folderPath, String newFolderPath, AuthenticationInfo subject) { - final int maxKeys = 200; - String nextMarker = null; - ObjectListing objectListing = null; - do { - objectListing = ossClient.listObjects( - new ListObjectsRequest(bucketName) - .withPrefix(rootFolder + folderPath + "/") - .withMarker(nextMarker) - .withMaxKeys(maxKeys)); - List<OSSObjectSummary> sums = objectListing.getObjectSummaries(); - for (OSSObjectSummary s : sums) { - if (s.getKey().endsWith(".zpln")) { - try { - String noteId = getNoteId(s.getKey()); - String notePath = getNotePath(rootFolder, s.getKey()); - String newNotePath = newFolderPath + notePath.substring(folderPath.length()); - move(noteId, notePath, newNotePath, subject); - } catch (IOException e) { - LOGGER.warn(e.getMessage()); - } - } else { - LOGGER.debug("Skip invalid note file: {}", s.getKey()); + List<String> objectKeys = ossOperator.listDirObjects(bucketName, rootFolder + folderPath + "/"); + for (String key : objectKeys) { + if (key.endsWith(".zpln")) { + try { + String noteId = getNoteId(key); + String notePath = getNotePath(rootFolder, key); + String newNotePath = newFolderPath + notePath.substring(folderPath.length()); + move(noteId, notePath, newNotePath, subject); + } catch (IOException e) { + LOGGER.warn(e.getMessage()); } + } else { + LOGGER.debug("Skip invalid note file: {}", key); } - nextMarker = objectListing.getNextMarker(); - } while (objectListing.isTruncated()); + } } @Override public void remove(String noteId, String notePath, AuthenticationInfo subject) - throws IOException { - ossClient.deleteObject(bucketName, rootFolder + "/" + buildNoteFileName(noteId, notePath)); + throws IOException { + ossOperator.deleteFile(bucketName, rootFolder + "/" + buildNoteFileName(noteId, notePath)); + // if there is no file under revisonInfoPath, deleleDir() would do nothing + ossOperator.deleteDir(bucketName, rootFolder + "/" + buildRevisionsDirName(noteId, notePath)); } @Override - public void remove(String folderPath, AuthenticationInfo subject) { - String nextMarker = null; - ObjectListing objectListing = null; - do { - ListObjectsRequest listObjectsRequest = new ListObjectsRequest(bucketName) - .withPrefix(rootFolder + folderPath + "/") - .withMarker(nextMarker); - objectListing = ossClient.listObjects(listObjectsRequest); - if (!objectListing.getObjectSummaries().isEmpty()) { - List<String> keys = new ArrayList<>(); - for (OSSObjectSummary s : objectListing.getObjectSummaries()) { - keys.add(s.getKey()); + public void remove(String folderPath, AuthenticationInfo subject) throws IOException { + List<String> objectKeys = ossOperator.listDirObjects(bucketName, rootFolder + folderPath + "/"); + for (String key : objectKeys) { + if (key.endsWith(".zpln")) { + try { + String noteId = getNoteId(key); + String notePath = getNotePath(rootFolder, key); + // delete note revision file + ossOperator.deleteDir(bucketName, rootFolder + "/" + buildRevisionsDirName(noteId, notePath)); + } catch (IOException e) { + LOGGER.warn(e.getMessage()); } - DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(bucketName).withKeys(keys); - ossClient.deleteObjects(deleteObjectsRequest); } - - nextMarker = objectListing.getNextMarker(); - } while (objectListing.isTruncated()); + } + // delete note file + ossOperator.deleteFiles(bucketName, objectKeys); } + @Override public void close() { - ossClient.shutdown(); + ossOperator.shutdown(); } @Override @@ -214,4 +192,94 @@ public class OSSNotebookRepo implements NotebookRepo { LOGGER.warn("Method not implemented"); } + + private static String buildRevisionsDirName(String noteId, String notePath) throws IOException { + if (!notePath.startsWith("/")) { + throw new IOException("Invalid notePath: " + notePath); + } + return ".checkpoint/" + (notePath + "_" + noteId).substring(1); + } + + private String buildRevisionsInfoAbsolutePath(String noteId, String notePath) throws IOException { + return rootFolder + "/" + buildRevisionsDirName(noteId, notePath) + "/" + ".revision-info"; + } + + private String buildRevisionsFileAbsolutePath(String noteId, String notePath, String revisionId) throws IOException { + return rootFolder + "/" + buildRevisionsDirName(noteId, notePath) + "/" + revisionId; + } + + + @Override + public Revision checkpoint(String noteId, String notePath, String checkpointMsg, AuthenticationInfo subject) throws IOException { + if (maxVersionNumber <= 0) { + throw new IOException("Version control is closed because the value of zeppelin.notebook.oss.version.max is set to 0"); + } + + Note note = get(noteId, notePath, subject); + + //1 Write note content to revision file + String revisionId = UUID.randomUUID().toString().replace("-", ""); + String noteContent = note.toJson(); + ossOperator.putTextObject(bucketName, + buildRevisionsFileAbsolutePath(noteId, notePath, revisionId), + new ByteArrayInputStream(noteContent.getBytes())); + + //2 Append revision info + Revision revision = new Revision(revisionId, checkpointMsg, (int) (System.currentTimeMillis() / 1000L)); + // check revision info file if existed + RevisionsInfo revisionsHistory = new RevisionsInfo(); + String revisonInfoPath = buildRevisionsInfoAbsolutePath(noteId, notePath); + boolean found = ossOperator.doesObjectExist(bucketName, revisonInfoPath); + if (found) { + String existedRevisionsInfoText = ossOperator.getTextObject(bucketName, revisonInfoPath); + revisionsHistory = RevisionsInfo.fromText(existedRevisionsInfoText); + // control the num of revison files, clean the oldest one if it exceeds. + if (revisionsHistory.size() >= maxVersionNumber) { + Revision deletedRevision = revisionsHistory.removeLast(); + ossOperator.deleteFile(bucketName, buildRevisionsFileAbsolutePath(noteId, notePath, deletedRevision.id)); + } + } + revisionsHistory.addFirst(revision); + + ossOperator.putTextObject(bucketName, + buildRevisionsInfoAbsolutePath(noteId, notePath), + new ByteArrayInputStream(revisionsHistory.toText().getBytes())); + + return revision; + } + + @Override + public Note get(String noteId, String notePath, String revId, AuthenticationInfo subject) throws IOException { + Note note = getByOSSPath(noteId, buildRevisionsFileAbsolutePath(noteId, notePath, revId)); + if (note != null) { + note.setPath(notePath); + } + return note; + } + + @Override + public List<Revision> revisionHistory(String noteId, String notePath, AuthenticationInfo subject) throws IOException { + if (maxVersionNumber <= 0) { + return new ArrayList<>(); + } + + List<Revision> revisions = new LinkedList<>(); + String revisonInfoPath = buildRevisionsInfoAbsolutePath(noteId, notePath); + boolean found = ossOperator.doesObjectExist(bucketName, revisonInfoPath); + if (!found) { + return revisions; + } + String revisionsText = ossOperator.getTextObject(bucketName, revisonInfoPath); + + return RevisionsInfo.fromText(revisionsText); + } + + @Override + public Note setNoteRevision(String noteId, String notePath, String revId, AuthenticationInfo subject) throws IOException { + Note revisionNote = get(noteId, notePath, revId, subject); + if (revisionNote != null) { + save(revisionNote, subject); + } + return revisionNote; + } } diff --git a/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/RevisionsInfo.java b/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/RevisionsInfo.java new file mode 100644 index 0000000000..44b2aaa9cc --- /dev/null +++ b/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/RevisionsInfo.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo; + +import com.google.gson.Gson; + +import java.util.LinkedList; + +public class RevisionsInfo extends LinkedList<NotebookRepoWithVersionControl.Revision> { + + private static Gson GSON = new Gson(); + + public static RevisionsInfo fromText(String revisionsText) { + RevisionsInfo revisionsInfo = GSON.fromJson(revisionsText, RevisionsInfo.class); + if (revisionsInfo == null) { + return new RevisionsInfo(); + } + return revisionsInfo; + } + + public String toText() { + return GSON.toJson(this); + } +} diff --git a/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/storage/OSSOperator.java b/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/storage/OSSOperator.java new file mode 100644 index 0000000000..c04fa8df3c --- /dev/null +++ b/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/storage/OSSOperator.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo.storage; + +import com.aliyun.oss.OSS; +import com.aliyun.oss.OSSClientBuilder; +import com.aliyun.oss.model.*; +import org.apache.commons.io.IOUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + + +/** + * OSSOperator is a higher-level encapsulation of OSSClient, + * which makes OSSNotebookRepo shield from specific OSS operations + * or complex combinations of them. + */ +public class OSSOperator implements RemoteStorageOperator { + private OSS ossClient; + + public OSSOperator(String endpoint, String accessKeyId, String accessKeySecret) { + this.ossClient = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret); + } + + + @Override + public void createBucket(String bucketName) { + ossClient.createBucket(bucketName); + } + + + @Override + public void deleteBucket(String bucketName) { + ossClient.deleteBucket(bucketName); + } + + @Override + public boolean doesObjectExist(String bucketName, String key) throws IOException { + return ossClient.doesObjectExist(bucketName, key); + } + + + @Override + public String getTextObject(String bucketName, String key) throws IOException { + if (!doesObjectExist(bucketName, key)) { + throw new IOException("Note or its revision not found"); + } + OSSObject ossObject = ossClient.getObject(bucketName, key); + InputStream in = null; + try { + in = ossObject.getObjectContent(); + return IOUtils.toString(in, StandardCharsets.UTF_8); + } finally { + if (in != null) { + in.close(); + } + } + } + + + @Override + public void putTextObject(String bucketName, String key, InputStream inputStream) { + PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, key, inputStream); + ossClient.putObject(putObjectRequest); + } + + + @Override + public void moveObject(String bucketName, String sourceKey, String destKey) throws IOException { + if (!doesObjectExist(bucketName, sourceKey)) { + throw new IOException("Note or its revision not found"); + } + CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucketName, + sourceKey, bucketName, destKey); + ossClient.copyObject(copyObjectRequest); + ossClient.deleteObject(bucketName, sourceKey); + } + + @Override + public void moveDir(String bucketName, String sourceDir, String destDir) throws IOException { + List<String> objectKeys = listDirObjects(bucketName, sourceDir); + for (String key : objectKeys) { + moveObject(bucketName, key, destDir + key.substring(sourceDir.length())); + } + } + + + @Override + public void deleteDir(String bucketName, String dirname) { + List<String> keys = listDirObjects(bucketName, dirname); + deleteFiles(bucketName, keys); + } + + @Override + public void deleteFile(String bucketName, String objectKey) throws IOException { + deleteFiles(bucketName, Arrays.asList(objectKey)); + } + + @Override + public void deleteFiles(String bucketName, List<String> objectKeys) { + if (objectKeys != null && objectKeys.size() > 0) { + DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(bucketName).withKeys(objectKeys); + ossClient.deleteObjects(deleteObjectsRequest); + } + } + + + @Override + public List<String> listDirObjects(String bucketName, String dirname) { + String nextMarker = null; + ObjectListing objectListing = null; + List<String> keys = new ArrayList<>(); + do { + ListObjectsRequest listObjectsRequest = new ListObjectsRequest(bucketName) + .withPrefix(dirname) + .withMarker(nextMarker); + objectListing = ossClient.listObjects(listObjectsRequest); + if (!objectListing.getObjectSummaries().isEmpty()) { + for (OSSObjectSummary s : objectListing.getObjectSummaries()) { + keys.add(s.getKey()); + } + } + + nextMarker = objectListing.getNextMarker(); + } while (objectListing.isTruncated()); + return keys; + } + + @Override + public void shutdown() { + ossClient.shutdown(); + } +} diff --git a/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/storage/RemoteStorageOperator.java b/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/storage/RemoteStorageOperator.java new file mode 100644 index 0000000000..e3dfedcd07 --- /dev/null +++ b/zeppelin-plugins/notebookrepo/oss/src/main/java/org/apache/zeppelin/notebook/repo/storage/RemoteStorageOperator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo.storage; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +public interface RemoteStorageOperator { + void createBucket(String bucketName) throws IOException; + + void deleteBucket(String bucketName) throws IOException; + + boolean doesObjectExist(String bucketName, String key) throws IOException; + + String getTextObject(String bucketName, String key) throws IOException; + + void putTextObject(String bucketName, String key, InputStream inputStream) throws IOException; + + void moveObject(String bucketName, String sourceKey, String destKey) throws IOException; + + void moveDir(String bucketName, String sourceDir, String destDir) throws IOException; + + void deleteDir(String bucketName, String dirname) throws IOException; + + void deleteFile(String bucketName, String objectKey) throws IOException; + + void deleteFiles(String bucketName, List<String> objectKeys) throws IOException; + + List<String> listDirObjects(String bucketName, String dirname); + + void shutdown(); +} diff --git a/zeppelin-plugins/notebookrepo/oss/src/test/java/org/apache/zeppelin/notebook/repo/MockStorageOperator.java b/zeppelin-plugins/notebookrepo/oss/src/test/java/org/apache/zeppelin/notebook/repo/MockStorageOperator.java new file mode 100644 index 0000000000..3aa8d6d196 --- /dev/null +++ b/zeppelin-plugins/notebookrepo/oss/src/test/java/org/apache/zeppelin/notebook/repo/MockStorageOperator.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.filefilter.TrueFileFilter; +import org.apache.zeppelin.notebook.repo.storage.RemoteStorageOperator; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; + +public class MockStorageOperator implements RemoteStorageOperator { + + private String mockRootFolder; + + public MockStorageOperator() throws IOException { + Path tempDirectory = Files.createTempDirectory("zeppelin_mock_storage_dir_"); + mockRootFolder = tempDirectory.toString() + "/"; + } + + @Override + public void createBucket(String bucketName) throws IOException { + FileUtils.forceMkdir(new File(mockRootFolder + bucketName)); + } + + @Override + public void deleteBucket(String bucketName) throws IOException { + FileUtils.deleteDirectory(new File(mockRootFolder + bucketName)); + } + + @Override + public boolean doesObjectExist(String bucketName, String key) throws IOException { + File file = new File(mockRootFolder + bucketName + "/" + key); + return file.exists() && !file.isDirectory(); + } + + @Override + public String getTextObject(String bucketName, String key) throws IOException { + if (!doesObjectExist(bucketName, key)) { + throw new IOException("Note or its revision not found"); + } + return FileUtils.readFileToString(new File(mockRootFolder + bucketName + "/" + key), "UTF-8"); + } + + @Override + public void putTextObject(String bucketName, String key, InputStream inputStream) throws IOException { + File destination = new File(mockRootFolder + bucketName + "/" + key); + destination.getParentFile().mkdirs(); + FileUtils.copyInputStreamToFile(inputStream, destination); + } + + @Override + public void moveObject(String bucketName, String sourceKey, String destKey) throws IOException { + FileUtils.moveFile(new File(mockRootFolder + bucketName + "/" + sourceKey), + new File(mockRootFolder + bucketName + "/" + destKey)); + } + + @Override + public void moveDir(String bucketName, String sourceDir, String destDir) throws IOException { + List<String> objectKeys = listDirObjects(bucketName, sourceDir); + for (String key : objectKeys) { + moveObject(bucketName, key, destDir + key.substring(sourceDir.length())); + } + } + + @Override + public void deleteDir(String bucketName, String dirname) throws IOException { + List<String> keys = listDirObjects(bucketName, dirname); + deleteFiles(bucketName, keys); + } + + @Override + public void deleteFile(String bucketName, String objectKey) throws IOException { + FileUtils.forceDelete(new File(mockRootFolder + bucketName + "/" + objectKey)); + } + + @Override + public void deleteFiles(String bucketName, List<String> objectKeys) throws IOException { + if (objectKeys != null && objectKeys.size() > 0) { + for (String objectKey : objectKeys) { + deleteFile(bucketName, objectKey); + } + } + } + + @Override + public List<String> listDirObjects(String bucketName, String dirname) { + File directory = new File(mockRootFolder + bucketName + "/" + dirname); + if (!directory.isDirectory()) { + return new ArrayList<>(); + } + Collection<File> files = FileUtils.listFiles(directory, TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE); + return files.stream().map(file -> file.getPath().substring((mockRootFolder + bucketName + "/").length())).collect(Collectors.toList()); + } + + @Override + public void shutdown() { + + } +} diff --git a/zeppelin-plugins/notebookrepo/oss/src/test/java/org/apache/zeppelin/notebook/repo/OSSNotebookRepoTest.java b/zeppelin-plugins/notebookrepo/oss/src/test/java/org/apache/zeppelin/notebook/repo/OSSNotebookRepoTest.java new file mode 100644 index 0000000000..a84598d0aa --- /dev/null +++ b/zeppelin-plugins/notebookrepo/oss/src/test/java/org/apache/zeppelin/notebook/repo/OSSNotebookRepoTest.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo; + +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.NoteInfo; +import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.notebook.repo.storage.RemoteStorageOperator; +import org.apache.zeppelin.scheduler.Job; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class OSSNotebookRepoTest { + + private AuthenticationInfo anonymous = AuthenticationInfo.ANONYMOUS; + private OSSNotebookRepo notebookRepo; + private RemoteStorageOperator ossOperator; + private String bucket; + private static int OSS_VERSION_MAX = 30; + + + + @Before + public void setUp() throws IOException { + bucket = "zeppelin-test-bucket"; + String endpoint = "yourEndpoint"; + String accessKeyId = "yourAccessKeyId"; + String accessKeySecret = "yourAccessKeySecret"; + ossOperator = new MockStorageOperator(); + ossOperator.createBucket(bucket); + notebookRepo = new OSSNotebookRepo(); + ZeppelinConfiguration conf = ZeppelinConfiguration.create(); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_OSS_ENDPOINT.getVarName(), + endpoint); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_OSS_BUCKET.getVarName(), + bucket); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_OSS_ACCESSKEYID.getVarName(), + accessKeyId); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_OSS_ACCESSKEYSECRET.getVarName(), + accessKeySecret); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_OSS_VERSION_MAX.getVarName(), + OSS_VERSION_MAX + ""); + notebookRepo.init(conf); + notebookRepo.setOssOperator(ossOperator); + } + + @After + public void tearDown() throws InterruptedException, IOException { + ossOperator.deleteDir(bucket, ""); + ossOperator.deleteBucket(bucket); + // The delete operations on OSS Service above has a delay. + // And it would affect setup of next test case if we do not wait for them to end. + Thread.sleep(1000); + + // notebookRepo.close() would call ossOperator.shutdown() + if (notebookRepo != null) { + notebookRepo.close(); + } + } + + @Test + public void testNotebookRepo() throws IOException { + Map<String, NoteInfo> notesInfo = notebookRepo.list(anonymous); + assertEquals(0, notesInfo.size()); + + // create Note note1 + Note note1 = new Note(); + note1.setPath("/spark/note_1"); + notebookRepo.save(note1, anonymous); + + // + for (int i = 1; i <= OSS_VERSION_MAX + 3; i++) { + Paragraph p = new Paragraph(note1, null); + p.setText("text" + i); + p.setStatus(Job.Status.RUNNING); + p.setAuthenticationInfo(new AuthenticationInfo("anonymous", (String) null, "anonymous")); + note1.addParagraph(p); + notebookRepo.save(note1, anonymous); + notebookRepo.checkpoint(note1.getId(), note1.getPath(), "commit " + i, anonymous); + } + + notesInfo = notebookRepo.list(anonymous); + assertEquals(1, notesInfo.size()); + assertEquals("/spark/note_1", notesInfo.get(note1.getId()).getPath()); + + // Get note1 + Note noteFromRepo = notebookRepo.get(note1.getId(), note1.getPath(), anonymous); + assertEquals(note1.getName(), noteFromRepo.getName()); + + // Get non-existed note + try { + notebookRepo.get("invalid_id", "/invalid_path", anonymous); + fail("Should fail to get non-existed note1"); + } catch (IOException e) { + assertEquals(e.getMessage(), "Note or its revision not found"); + } + + // create another Note note2 + Note note2 = new Note(); + note2.setPath("/spark/note_2"); + notebookRepo.save(note2, anonymous); + + notesInfo = notebookRepo.list(anonymous); + assertEquals(2, notesInfo.size()); + assertEquals("/spark/note_1", notesInfo.get(note1.getId()).getPath()); + assertEquals("/spark/note_2", notesInfo.get(note2.getId()).getPath()); + + // move note1 + notebookRepo.move(note1.getId(), note1.getPath(), "/spark2/note_1", anonymous); + + notesInfo = notebookRepo.list(anonymous); + assertEquals(2, notesInfo.size()); + assertEquals("/spark2/note_1", notesInfo.get(note1.getId()).getPath()); + assertEquals("/spark/note_2", notesInfo.get(note2.getId()).getPath()); + + // move folder + notebookRepo.move("/spark2", "/spark3", anonymous); + + notesInfo = notebookRepo.list(anonymous); + assertEquals(2, notesInfo.size()); + assertEquals("/spark3/note_1", notesInfo.get(note1.getId()).getPath()); + assertEquals("/spark/note_2", notesInfo.get(note2.getId()).getPath()); + + // delete note + notebookRepo.remove(note1.getId(), notesInfo.get(note1.getId()).getPath(), anonymous); + notesInfo = notebookRepo.list(anonymous); + assertEquals(1, notesInfo.size()); + assertEquals("/spark/note_2", notesInfo.get(note2.getId()).getPath()); + + // delete folder + notebookRepo.remove("/spark", anonymous); + notesInfo = notebookRepo.list(anonymous); + assertEquals(0, notesInfo.size()); + } + + + @Test + public void testNotebookRepoWithVersionControl() throws IOException { + Map<String, NoteInfo> notesInfo = notebookRepo.list(anonymous); + assertEquals(0, notesInfo.size()); + + // create Note note1 + Note note1 = new Note(); + note1.setPath("/version_control/note_1"); + + List<NotebookRepoWithVersionControl.Revision> revisionList = new ArrayList<>(); + + for (int i = 1; i <= OSS_VERSION_MAX + 3; i++) { + Paragraph p = new Paragraph(note1, null); + p.setText("text" + i); + p.setStatus(Job.Status.RUNNING); + p.setAuthenticationInfo(new AuthenticationInfo("anonymous", (String) null, "anonymous")); + note1.addParagraph(p); + notebookRepo.save(note1, anonymous); + + // checkpoint + NotebookRepoWithVersionControl.Revision revision = notebookRepo.checkpoint(note1.getId(), note1.getPath(), "commit " + i, anonymous); + revisionList.add(revision); + + List<NotebookRepoWithVersionControl.Revision> revisionsHistory = notebookRepo.revisionHistory(note1.getId(), note1.getPath(), anonymous); + // verify OSS_VERSION_MAX control + if (i <= OSS_VERSION_MAX) { + assertEquals(i, revisionsHistory.size()); + } else { + assertEquals(OSS_VERSION_MAX, revisionsHistory.size()); + } + } + + // get note by non-existed revisionId + for (int i = 1; i <= 3; i++) { + try { + notebookRepo.get(note1.getId(), note1.getPath(), revisionList.get(i - 1).id, anonymous); + fail("Should fail to get non-existed note1"); + } catch (IOException e) { + assertEquals(e.getMessage(), "Note or its revision not found"); + } + } + + // get note by existed revisionId + for (int i = 4; i <= OSS_VERSION_MAX + 3; i++) { + Note note = notebookRepo.get(note1.getId(), note1.getPath(), revisionList.get(i - 1).id, anonymous); + assertEquals(i, note.getParagraphs().size()); + } + + // revisionsHistory + List<NotebookRepoWithVersionControl.Revision> revisionsHistory = notebookRepo.revisionHistory(note1.getId(), note1.getPath(), anonymous); + for (int i = 0; i < revisionsHistory.size(); i++) { + assertEquals(revisionsHistory.get(i).id, revisionList.get(revisionList.size() - i - 1).id); + assertEquals(revisionsHistory.get(i).message, revisionList.get(revisionList.size() - i - 1).message); + assertEquals(revisionsHistory.get(i).time, revisionList.get(revisionList.size() - i - 1).time); + } + + + // Modify note to distinguish itself with last version + Paragraph p = new Paragraph(note1, null); + p.setText("text" + OSS_VERSION_MAX + 4); + p.setStatus(Job.Status.RUNNING); + p.setAuthenticationInfo(new AuthenticationInfo("anonymous", (String) null, "anonymous")); + note1.addParagraph(p); + notebookRepo.save(note1, anonymous); + + assertEquals(notebookRepo.get(note1.getId(), note1.getPath(), anonymous).getParagraphs().size(), OSS_VERSION_MAX + 4); + + // Assume OSS_VERSION_MAX = 30 + // revert note to revision 31 , then to revision 32, then to revision 33, finally to revision 31 + for (int i = OSS_VERSION_MAX + 1; i <= OSS_VERSION_MAX + 3; i++) { + notebookRepo.setNoteRevision(note1.getId(), note1.getPath(), revisionList.get(i - 1).id, anonymous); + assertEquals(notebookRepo.get(note1.getId(), note1.getPath(), anonymous).getParagraphs().size(), i); + } + + // finally revert note to revision 31 + notebookRepo.setNoteRevision(note1.getId(), note1.getPath(), revisionList.get(OSS_VERSION_MAX).id, anonymous); + assertEquals(notebookRepo.get(note1.getId(), note1.getPath(), anonymous).getParagraphs().size(), OSS_VERSION_MAX + 1); + + notebookRepo.remove("/version_control", anonymous); + } +}