Repository: atlas
Updated Branches:
  refs/heads/master 06ff0752f -> 41997f648


ATLAS-2802: Atlas Client Update for Export and Import.


Project: http://git-wip-us.apache.org/repos/asf/atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/41997f64
Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/41997f64
Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/41997f64

Branch: refs/heads/master
Commit: 41997f648b27aa523bcf6c30b2184041e02898b4
Parents: 06ff075
Author: Ashutosh Mestry <[email protected]>
Authored: Thu Aug 2 09:56:39 2018 -0700
Committer: Ashutosh Mestry <[email protected]>
Committed: Mon Oct 8 15:47:11 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/atlas/AtlasBaseClient.java  |  72 +++++++++++++++-
 .../web/resources/AdminExportImportTestIT.java  |  85 +++++++++++++++++++
 .../test/resources/json/export-incremental.json |  11 +++
 webapp/src/test/resources/stocks-base.zip       | Bin 0 -> 13166 bytes
 4 files changed, 164 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/atlas/blob/41997f64/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java
----------------------------------------------------------------------
diff --git a/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java 
b/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java
index 7ca656d..df021ad 100644
--- a/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java
+++ b/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java
@@ -30,9 +30,14 @@ import com.sun.jersey.api.client.config.DefaultClientConfig;
 import com.sun.jersey.api.client.filter.HTTPBasicAuthFilter;
 import com.sun.jersey.api.json.JSONConfiguration;
 import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+import com.sun.jersey.multipart.BodyPart;
+import com.sun.jersey.multipart.FormDataBodyPart;
 import com.sun.jersey.multipart.FormDataMultiPart;
 import com.sun.jersey.multipart.MultiPart;
 import com.sun.jersey.multipart.file.FileDataBodyPart;
+import com.sun.jersey.multipart.file.StreamDataBodyPart;
+import com.sun.jersey.multipart.impl.MultiPartWriter;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
 import org.apache.atlas.model.impexp.AtlasImportRequest;
 import org.apache.atlas.model.impexp.AtlasImportResult;
 import org.apache.atlas.model.metrics.AtlasMetrics;
@@ -41,6 +46,7 @@ import org.apache.atlas.type.AtlasType;
 import org.apache.atlas.utils.AtlasJson;
 import org.apache.atlas.utils.AuthenticationUtil;
 import org.apache.commons.configuration.Configuration;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
@@ -52,8 +58,11 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriBuilder;
+import java.io.ByteArrayInputStream;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.ConnectException;
 import java.net.URI;
 import java.nio.file.Paths;
@@ -69,6 +78,7 @@ public abstract class AtlasBaseClient {
     public static final String ADMIN_STATUS = "admin/status";
     public static final String ADMIN_METRICS = "admin/metrics";
     public static final String ADMIN_IMPORT = "admin/import";
+    public static final String ADMIN_EXPORT = "admin/export";
     public static final String HTTP_AUTHENTICATION_ENABLED = 
"atlas.http.authentication.enabled";
 
     public static final String QUERY = "query";
@@ -91,6 +101,10 @@ public abstract class AtlasBaseClient {
     static final int DEFAULT_SLEEP_BETWEEN_RETRIES_MS = 5000;
     private static final Logger LOG = 
LoggerFactory.getLogger(AtlasBaseClient.class);
     private static final API IMPORT = new API(BASE_URI + ADMIN_IMPORT, 
HttpMethod.POST, Response.Status.OK, MediaType.MULTIPART_FORM_DATA, 
MediaType.APPLICATION_JSON);
+    private static final API EXPORT = new API(BASE_URI + ADMIN_EXPORT, 
HttpMethod.POST, Response.Status.OK, MediaType.APPLICATION_JSON, 
MediaType.APPLICATION_OCTET_STREAM);
+    private static final String IMPORT_REQUEST_PARAMTER = "request";
+    private static final String IMPORT_DATA_PARAMETER = "data";
+
     protected WebResource service;
     protected Configuration configuration;
     private String basicAuthUser;
@@ -251,6 +265,8 @@ public abstract class AtlasBaseClient {
         // Enable POJO mapping feature
         config.getFeatures().put(JSONConfiguration.FEATURE_POJO_MAPPING, 
Boolean.TRUE);
         config.getClasses().add(JacksonJaxbJsonProvider.class);
+        config.getClasses().add(MultiPartWriter.class);
+
         int readTimeout = 
configuration.getInt("atlas.client.readTimeoutMSecs", 60000);
         int connectTimeout = 
configuration.getInt("atlas.client.connectTimeoutMSecs", 60000);
         if (configuration.getBoolean(TLS_ENABLED, false)) {
@@ -367,7 +383,9 @@ public abstract class AtlasBaseClient {
                     return null;
                 }
                 try {
-                    if (responseType.getRawClass().equals(ObjectNode.class)) {
+                    
if(api.getProduces().equals(MediaType.APPLICATION_OCTET_STREAM)) {
+                        return (T) 
IOUtils.toByteArray(clientResponse.getEntityInputStream());
+                    } else if 
(responseType.getRawClass().equals(ObjectNode.class)) {
                         String stringEntity = 
clientResponse.getEntity(String.class);
                         try {
                             JsonNode jsonObject = 
AtlasJson.parseToV1JsonNode(stringEntity);
@@ -385,6 +403,8 @@ public abstract class AtlasBaseClient {
                     }
                 } catch (ClientHandlerException e) {
                     throw new AtlasServiceException(api, e);
+                } catch (IOException e) {
+                    throw new AtlasServiceException(api, e);
                 }
             } else if (clientResponse.getStatus() != 
ClientResponse.Status.SERVICE_UNAVAILABLE.getStatusCode()) {
                 break;
@@ -414,7 +434,7 @@ public abstract class AtlasBaseClient {
         return getResource(service, api, queryParams);
     }
 
-    protected abstract API formatPathParameters(API api, String ... params);
+    protected abstract API formatPathParameters(API api, String... params);
 
     void initializeState(String[] baseUrls, UserGroupInformation ugi, String 
doAsUser) {
         initializeState(getClientProperties(), baseUrls, ugi, doAsUser);
@@ -446,15 +466,59 @@ public abstract class AtlasBaseClient {
         return 
configuration.getInt(AtlasBaseClient.ATLAS_CLIENT_HA_RETRIES_KEY, 
AtlasBaseClient.DEFAULT_NUM_RETRIES);
     }
 
+    public byte[] exportData(AtlasExportRequest request) throws 
AtlasServiceException {
+        try {
+            return (byte[]) callAPI(EXPORT, Object.class, request);
+        } catch (Exception e) {
+            LOG.error("error writing to file", e);
+            throw new AtlasServiceException(e);
+        }
+    }
+
+    public void exportData(AtlasExportRequest request, String absolutePath) 
throws AtlasServiceException {
+        OutputStream fileOutputStream = null;
+        try {
+            byte[] fileBytes = exportData(request);
+            fileOutputStream = new FileOutputStream(new File(absolutePath));
+            IOUtils.write(fileBytes, fileOutputStream);
+        } catch (Exception e) {
+            LOG.error("error writing to file", e);
+            throw new AtlasServiceException(e);
+        } finally {
+            if(fileOutputStream != null) {
+                try {
+                    fileOutputStream.close();
+                } catch (IOException e) {
+                    LOG.error("error closing file", e);
+                    throw new AtlasServiceException(e);
+                }
+            }
+        }
+    }
+
     public AtlasImportResult importData(AtlasImportRequest request, String 
absoluteFilePath) throws AtlasServiceException {
-        FileDataBodyPart filePart = new FileDataBodyPart("data", new 
File(absoluteFilePath));
+        return performImportData(getImportRequestBodyPart(request),
+                            new FileDataBodyPart(IMPORT_DATA_PARAMETER, new 
File(absoluteFilePath)));
+    }
+
+    public AtlasImportResult importData(AtlasImportRequest request, byte[] 
fileData) throws AtlasServiceException {
+        return performImportData(getImportRequestBodyPart(request),
+                                new StreamDataBodyPart(IMPORT_DATA_PARAMETER, 
new ByteArrayInputStream(fileData)));
+    }
+
+    private AtlasImportResult performImportData(BodyPart requestPart, BodyPart 
filePart) throws AtlasServiceException {
         MultiPart multipartEntity = new FormDataMultiPart()
-                .field("request", AtlasType.toJson(request), 
MediaType.APPLICATION_JSON_TYPE)
+                .bodyPart(requestPart)
                 .bodyPart(filePart);
 
         return callAPI(IMPORT, AtlasImportResult.class, multipartEntity);
     }
 
+
+    private FormDataBodyPart getImportRequestBodyPart(AtlasImportRequest 
request) {
+        return new FormDataBodyPart(IMPORT_REQUEST_PARAMTER, 
AtlasType.toJson(request), MediaType.APPLICATION_JSON_TYPE);
+    }
+
     boolean isRetryableException(ClientHandlerException che) {
         return che.getCause().getClass().equals(IOException.class)
                 || che.getCause().getClass().equals(ConnectException.class);

http://git-wip-us.apache.org/repos/asf/atlas/blob/41997f64/webapp/src/test/java/org/apache/atlas/web/resources/AdminExportImportTestIT.java
----------------------------------------------------------------------
diff --git 
a/webapp/src/test/java/org/apache/atlas/web/resources/AdminExportImportTestIT.java
 
b/webapp/src/test/java/org/apache/atlas/web/resources/AdminExportImportTestIT.java
new file mode 100644
index 0000000..fc804d2
--- /dev/null
+++ 
b/webapp/src/test/java/org/apache/atlas/web/resources/AdminExportImportTestIT.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.web.resources;
+
+
+import org.apache.atlas.AtlasServiceException;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.model.clusterinfo.AtlasCluster;
+import org.apache.atlas.model.impexp.AtlasExportRequest;
+import org.apache.atlas.model.impexp.AtlasImportRequest;
+import org.apache.atlas.model.impexp.AtlasImportResult;
+import org.apache.atlas.repository.impexp.ZipSource;
+import org.apache.atlas.utils.TestResourceFileUtils;
+import org.apache.atlas.web.integration.BaseResourceIT;
+import org.testng.annotations.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+public class AdminExportImportTestIT extends BaseResourceIT {
+    private final String FILE_TO_IMPORT = "stocks-base.zip";
+
+    @Test
+    public void isActive() throws AtlasServiceException {
+        assertEquals(atlasClientV2.getAdminStatus(), "ACTIVE");
+    }
+
+    @Test(dependsOnMethods = "isActive")
+    public void importData() throws AtlasServiceException, IOException {
+        performImport(FILE_TO_IMPORT);
+    }
+
+    @Test(dependsOnMethods = "importData")
+    public void exportData() throws AtlasServiceException, IOException, 
AtlasBaseException {
+        final int EXPECTED_CREATION_ORDER_SIZE = 13;
+
+        AtlasExportRequest request = 
TestResourceFileUtils.readObjectFromJson(".", "export-incremental", 
AtlasExportRequest.class);
+        byte[] exportedBytes = atlasClientV2.exportData(request);
+        assertNotNull(exportedBytes);
+
+        ZipSource zs = new ZipSource(new ByteArrayInputStream(exportedBytes));
+        assertNotNull(zs.getExportResult());
+        assertEquals(zs.getCreationOrder().size(), 
EXPECTED_CREATION_ORDER_SIZE);
+    }
+
+    private void performImport(String fileToImport) throws 
AtlasServiceException {
+        AtlasImportRequest request = new AtlasImportRequest();
+        byte[] fileBytes = new byte[0];
+        try {
+            fileBytes = 
Files.readAllBytes(Paths.get(TestResourceFileUtils.getTestFilePath(fileToImport)));
+        } catch (IOException e) {
+            assertFalse(true, "Exception: " + e.getMessage());
+        }
+        AtlasImportResult result = atlasClientV2.importData(request, 
fileBytes);
+
+        assertNotNull(result);
+        assertEquals(result.getOperationStatus(), 
AtlasImportResult.OperationStatus.SUCCESS);
+        assertNotNull(result.getMetrics());
+        assertEquals(result.getProcessedEntities().size(), 37);
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/41997f64/webapp/src/test/resources/json/export-incremental.json
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/json/export-incremental.json 
b/webapp/src/test/resources/json/export-incremental.json
new file mode 100644
index 0000000..9710841
--- /dev/null
+++ b/webapp/src/test/resources/json/export-incremental.json
@@ -0,0 +1,11 @@
+{
+  "itemsToExport": [
+    {
+      "typeName": "hive_db", "uniqueAttributes": { "qualifiedName": 
"stocks@cl1" }
+
+    }
+  ],
+  "options": {
+    "fetchType": "full"
+  }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/41997f64/webapp/src/test/resources/stocks-base.zip
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/stocks-base.zip 
b/webapp/src/test/resources/stocks-base.zip
new file mode 100644
index 0000000..40c7f37
Binary files /dev/null and b/webapp/src/test/resources/stocks-base.zip differ

Reply via email to