Repository: atlas Updated Branches: refs/heads/master 06ff0752f -> 41997f648
ATLAS-2802: Atlas Client Update for Export and Import. Project: http://git-wip-us.apache.org/repos/asf/atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/atlas/commit/41997f64 Tree: http://git-wip-us.apache.org/repos/asf/atlas/tree/41997f64 Diff: http://git-wip-us.apache.org/repos/asf/atlas/diff/41997f64 Branch: refs/heads/master Commit: 41997f648b27aa523bcf6c30b2184041e02898b4 Parents: 06ff075 Author: Ashutosh Mestry <[email protected]> Authored: Thu Aug 2 09:56:39 2018 -0700 Committer: Ashutosh Mestry <[email protected]> Committed: Mon Oct 8 15:47:11 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/atlas/AtlasBaseClient.java | 72 +++++++++++++++- .../web/resources/AdminExportImportTestIT.java | 85 +++++++++++++++++++ .../test/resources/json/export-incremental.json | 11 +++ webapp/src/test/resources/stocks-base.zip | Bin 0 -> 13166 bytes 4 files changed, 164 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/atlas/blob/41997f64/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java ---------------------------------------------------------------------- diff --git a/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java b/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java index 7ca656d..df021ad 100644 --- a/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java +++ b/client/common/src/main/java/org/apache/atlas/AtlasBaseClient.java @@ -30,9 +30,14 @@ import com.sun.jersey.api.client.config.DefaultClientConfig; import com.sun.jersey.api.client.filter.HTTPBasicAuthFilter; import com.sun.jersey.api.json.JSONConfiguration; import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; +import com.sun.jersey.multipart.BodyPart; +import com.sun.jersey.multipart.FormDataBodyPart; import com.sun.jersey.multipart.FormDataMultiPart; import com.sun.jersey.multipart.MultiPart; import com.sun.jersey.multipart.file.FileDataBodyPart; +import com.sun.jersey.multipart.file.StreamDataBodyPart; +import com.sun.jersey.multipart.impl.MultiPartWriter; +import org.apache.atlas.model.impexp.AtlasExportRequest; import org.apache.atlas.model.impexp.AtlasImportRequest; import org.apache.atlas.model.impexp.AtlasImportResult; import org.apache.atlas.model.metrics.AtlasMetrics; @@ -41,6 +46,7 @@ import org.apache.atlas.type.AtlasType; import org.apache.atlas.utils.AtlasJson; import org.apache.atlas.utils.AuthenticationUtil; import org.apache.commons.configuration.Configuration; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; @@ -52,8 +58,11 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; import javax.ws.rs.core.UriBuilder; +import java.io.ByteArrayInputStream; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.net.ConnectException; import java.net.URI; import java.nio.file.Paths; @@ -69,6 +78,7 @@ public abstract class AtlasBaseClient { public static final String ADMIN_STATUS = "admin/status"; public static final String ADMIN_METRICS = "admin/metrics"; public static final String ADMIN_IMPORT = "admin/import"; + public static final String ADMIN_EXPORT = "admin/export"; public static final String HTTP_AUTHENTICATION_ENABLED = "atlas.http.authentication.enabled"; public static final String QUERY = "query"; @@ -91,6 +101,10 @@ public abstract class AtlasBaseClient { static final int DEFAULT_SLEEP_BETWEEN_RETRIES_MS = 5000; private static final Logger LOG = LoggerFactory.getLogger(AtlasBaseClient.class); private static final API IMPORT = new API(BASE_URI + ADMIN_IMPORT, HttpMethod.POST, Response.Status.OK, MediaType.MULTIPART_FORM_DATA, MediaType.APPLICATION_JSON); + private static final API EXPORT = new API(BASE_URI + ADMIN_EXPORT, HttpMethod.POST, Response.Status.OK, MediaType.APPLICATION_JSON, MediaType.APPLICATION_OCTET_STREAM); + private static final String IMPORT_REQUEST_PARAMTER = "request"; + private static final String IMPORT_DATA_PARAMETER = "data"; + protected WebResource service; protected Configuration configuration; private String basicAuthUser; @@ -251,6 +265,8 @@ public abstract class AtlasBaseClient { // Enable POJO mapping feature config.getFeatures().put(JSONConfiguration.FEATURE_POJO_MAPPING, Boolean.TRUE); config.getClasses().add(JacksonJaxbJsonProvider.class); + config.getClasses().add(MultiPartWriter.class); + int readTimeout = configuration.getInt("atlas.client.readTimeoutMSecs", 60000); int connectTimeout = configuration.getInt("atlas.client.connectTimeoutMSecs", 60000); if (configuration.getBoolean(TLS_ENABLED, false)) { @@ -367,7 +383,9 @@ public abstract class AtlasBaseClient { return null; } try { - if (responseType.getRawClass().equals(ObjectNode.class)) { + if(api.getProduces().equals(MediaType.APPLICATION_OCTET_STREAM)) { + return (T) IOUtils.toByteArray(clientResponse.getEntityInputStream()); + } else if (responseType.getRawClass().equals(ObjectNode.class)) { String stringEntity = clientResponse.getEntity(String.class); try { JsonNode jsonObject = AtlasJson.parseToV1JsonNode(stringEntity); @@ -385,6 +403,8 @@ public abstract class AtlasBaseClient { } } catch (ClientHandlerException e) { throw new AtlasServiceException(api, e); + } catch (IOException e) { + throw new AtlasServiceException(api, e); } } else if (clientResponse.getStatus() != ClientResponse.Status.SERVICE_UNAVAILABLE.getStatusCode()) { break; @@ -414,7 +434,7 @@ public abstract class AtlasBaseClient { return getResource(service, api, queryParams); } - protected abstract API formatPathParameters(API api, String ... params); + protected abstract API formatPathParameters(API api, String... params); void initializeState(String[] baseUrls, UserGroupInformation ugi, String doAsUser) { initializeState(getClientProperties(), baseUrls, ugi, doAsUser); @@ -446,15 +466,59 @@ public abstract class AtlasBaseClient { return configuration.getInt(AtlasBaseClient.ATLAS_CLIENT_HA_RETRIES_KEY, AtlasBaseClient.DEFAULT_NUM_RETRIES); } + public byte[] exportData(AtlasExportRequest request) throws AtlasServiceException { + try { + return (byte[]) callAPI(EXPORT, Object.class, request); + } catch (Exception e) { + LOG.error("error writing to file", e); + throw new AtlasServiceException(e); + } + } + + public void exportData(AtlasExportRequest request, String absolutePath) throws AtlasServiceException { + OutputStream fileOutputStream = null; + try { + byte[] fileBytes = exportData(request); + fileOutputStream = new FileOutputStream(new File(absolutePath)); + IOUtils.write(fileBytes, fileOutputStream); + } catch (Exception e) { + LOG.error("error writing to file", e); + throw new AtlasServiceException(e); + } finally { + if(fileOutputStream != null) { + try { + fileOutputStream.close(); + } catch (IOException e) { + LOG.error("error closing file", e); + throw new AtlasServiceException(e); + } + } + } + } + public AtlasImportResult importData(AtlasImportRequest request, String absoluteFilePath) throws AtlasServiceException { - FileDataBodyPart filePart = new FileDataBodyPart("data", new File(absoluteFilePath)); + return performImportData(getImportRequestBodyPart(request), + new FileDataBodyPart(IMPORT_DATA_PARAMETER, new File(absoluteFilePath))); + } + + public AtlasImportResult importData(AtlasImportRequest request, byte[] fileData) throws AtlasServiceException { + return performImportData(getImportRequestBodyPart(request), + new StreamDataBodyPart(IMPORT_DATA_PARAMETER, new ByteArrayInputStream(fileData))); + } + + private AtlasImportResult performImportData(BodyPart requestPart, BodyPart filePart) throws AtlasServiceException { MultiPart multipartEntity = new FormDataMultiPart() - .field("request", AtlasType.toJson(request), MediaType.APPLICATION_JSON_TYPE) + .bodyPart(requestPart) .bodyPart(filePart); return callAPI(IMPORT, AtlasImportResult.class, multipartEntity); } + + private FormDataBodyPart getImportRequestBodyPart(AtlasImportRequest request) { + return new FormDataBodyPart(IMPORT_REQUEST_PARAMTER, AtlasType.toJson(request), MediaType.APPLICATION_JSON_TYPE); + } + boolean isRetryableException(ClientHandlerException che) { return che.getCause().getClass().equals(IOException.class) || che.getCause().getClass().equals(ConnectException.class); http://git-wip-us.apache.org/repos/asf/atlas/blob/41997f64/webapp/src/test/java/org/apache/atlas/web/resources/AdminExportImportTestIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/AdminExportImportTestIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/AdminExportImportTestIT.java new file mode 100644 index 0000000..fc804d2 --- /dev/null +++ b/webapp/src/test/java/org/apache/atlas/web/resources/AdminExportImportTestIT.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.web.resources; + + +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.clusterinfo.AtlasCluster; +import org.apache.atlas.model.impexp.AtlasExportRequest; +import org.apache.atlas.model.impexp.AtlasImportRequest; +import org.apache.atlas.model.impexp.AtlasImportResult; +import org.apache.atlas.repository.impexp.ZipSource; +import org.apache.atlas.utils.TestResourceFileUtils; +import org.apache.atlas.web.integration.BaseResourceIT; +import org.testng.annotations.Test; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + +public class AdminExportImportTestIT extends BaseResourceIT { + private final String FILE_TO_IMPORT = "stocks-base.zip"; + + @Test + public void isActive() throws AtlasServiceException { + assertEquals(atlasClientV2.getAdminStatus(), "ACTIVE"); + } + + @Test(dependsOnMethods = "isActive") + public void importData() throws AtlasServiceException, IOException { + performImport(FILE_TO_IMPORT); + } + + @Test(dependsOnMethods = "importData") + public void exportData() throws AtlasServiceException, IOException, AtlasBaseException { + final int EXPECTED_CREATION_ORDER_SIZE = 13; + + AtlasExportRequest request = TestResourceFileUtils.readObjectFromJson(".", "export-incremental", AtlasExportRequest.class); + byte[] exportedBytes = atlasClientV2.exportData(request); + assertNotNull(exportedBytes); + + ZipSource zs = new ZipSource(new ByteArrayInputStream(exportedBytes)); + assertNotNull(zs.getExportResult()); + assertEquals(zs.getCreationOrder().size(), EXPECTED_CREATION_ORDER_SIZE); + } + + private void performImport(String fileToImport) throws AtlasServiceException { + AtlasImportRequest request = new AtlasImportRequest(); + byte[] fileBytes = new byte[0]; + try { + fileBytes = Files.readAllBytes(Paths.get(TestResourceFileUtils.getTestFilePath(fileToImport))); + } catch (IOException e) { + assertFalse(true, "Exception: " + e.getMessage()); + } + AtlasImportResult result = atlasClientV2.importData(request, fileBytes); + + assertNotNull(result); + assertEquals(result.getOperationStatus(), AtlasImportResult.OperationStatus.SUCCESS); + assertNotNull(result.getMetrics()); + assertEquals(result.getProcessedEntities().size(), 37); + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/41997f64/webapp/src/test/resources/json/export-incremental.json ---------------------------------------------------------------------- diff --git a/webapp/src/test/resources/json/export-incremental.json b/webapp/src/test/resources/json/export-incremental.json new file mode 100644 index 0000000..9710841 --- /dev/null +++ b/webapp/src/test/resources/json/export-incremental.json @@ -0,0 +1,11 @@ +{ + "itemsToExport": [ + { + "typeName": "hive_db", "uniqueAttributes": { "qualifiedName": "stocks@cl1" } + + } + ], + "options": { + "fetchType": "full" + } +} http://git-wip-us.apache.org/repos/asf/atlas/blob/41997f64/webapp/src/test/resources/stocks-base.zip ---------------------------------------------------------------------- diff --git a/webapp/src/test/resources/stocks-base.zip b/webapp/src/test/resources/stocks-base.zip new file mode 100644 index 0000000..40c7f37 Binary files /dev/null and b/webapp/src/test/resources/stocks-base.zip differ
