This is an automated email from the ASF dual-hosted git repository. sarath pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 32b25aa ATLAS-4230: Add support for Google Cloud Storage Path Entity creation 32b25aa is described below commit 32b25aa6e3f1751c302b2a741b771e5d102316ef Author: sidmishra <sidmis...@cloudera.com> AuthorDate: Tue Mar 30 16:20:40 2021 -0700 ATLAS-4230: Add support for Google Cloud Storage Path Entity creation Signed-off-by: Sarath Subramanian <sar...@apache.org> (cherry picked from commit c03ee72ca557eab49f2e315fc17aefcce03176ef) --- .../apache/atlas/utils/AtlasPathExtractorUtil.java | 93 +++++++++++++++++++++- .../atlas/utils/AtlasPathExtractorUtilTest.java | 66 ++++++++++++++- 2 files changed, 155 insertions(+), 4 deletions(-) diff --git a/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java b/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java index 81f847e..a9f2e50 100644 --- a/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java +++ b/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java @@ -82,6 +82,13 @@ public class AtlasPathExtractorUtil { public static final String RELATIONSHIP_OZONE_VOLUME_BUCKET = "ozone_volume_buckets"; public static final String RELATIONSHIP_OZONE_PARENT_CHILDREN = "ozone_parent_children"; + //Google Cloud Storage + public static final String GCS_SCHEME = "gs" + SCHEME_SEPARATOR; + public static final String GCS_BUCKET = "gcp_storage_bucket"; + public static final String GCS_VIRTUAL_DIR = "gcp_storage_virtual_directory"; + public static final String ATTRIBUTE_GCS_PARENT = "parent"; + public static final String RELATIONSHIP_GCS_PARENT_CHILDREN = "gcp_storage_parent_children"; + public static AtlasEntityWithExtInfo getPathEntity(Path path, PathExtractorContext context) { AtlasEntityWithExtInfo entityWithExtInfo = new AtlasEntityWithExtInfo(); AtlasEntity ret; @@ -98,9 +105,12 @@ public class AtlasPathExtractorUtil { ret = addAbfsPathEntity(path, entityWithExtInfo, context); } else if (isOzonePath(strPath)) { ret = addOzonePathEntity(path, entityWithExtInfo, context); + } else if (isGCSPath(strPath)) { + ret = addGCSPathEntity(path, entityWithExtInfo, context); } else { ret = addHDFSPathEntity(path, context); } + entityWithExtInfo.setEntity(ret); return entityWithExtInfo; @@ -123,6 +133,10 @@ public class AtlasPathExtractorUtil { return strPath != null && (strPath.startsWith(OZONE_SCHEME) || strPath.startsWith(OZONE_3_SCHEME)); } + private static boolean isGCSPath(String strPath) { + return strPath != null && strPath.startsWith(GCS_SCHEME); + } + private static AtlasEntity addS3PathEntityV1(Path path, AtlasEntityExtInfo extInfo, PathExtractorContext context) { String strPath = path.toString(); @@ -217,7 +231,7 @@ public class AtlasPathExtractorUtil { ret = new AtlasEntity(AWS_S3_V2_PSEUDO_DIR); ret.setRelationshipAttribute(ATTRIBUTE_CONTAINER, parentObjId); - ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, subDirPath); + ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, parentPath); ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, subDirQualifiedName); ret.setAttribute(ATTRIBUTE_NAME, subDirName); @@ -442,6 +456,83 @@ public class AtlasPathExtractorUtil { return ret; } + private static AtlasEntity addGCSPathEntity(Path path, AtlasEntityExtInfo extInfo, PathExtractorContext context) { + String strPath = path.toString(); + + if (LOG.isDebugEnabled()) { + LOG.debug("==> addGCSPathEntity(strPath={})", strPath); + } + + String metadataNamespace = context.getMetadataNamespace(); + String pathQualifiedName = strPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace; + AtlasEntity ret = context.getEntity(pathQualifiedName); + + if (ret == null) { + String bucketName = path.toUri().getAuthority(); + String schemeAndBucketName = (path.toUri().getScheme() + SCHEME_SEPARATOR + bucketName).toLowerCase(); + String bucketQualifiedName = schemeAndBucketName + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace; + AtlasEntity bucketEntity = context.getEntity(bucketQualifiedName); + + if (bucketEntity == null) { + bucketEntity = new AtlasEntity(GCS_BUCKET); + + bucketEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, bucketQualifiedName); + bucketEntity.setAttribute(ATTRIBUTE_NAME, bucketName); + + if (LOG.isDebugEnabled()) { + LOG.debug("adding entity: typeName={}, qualifiedName={}", bucketEntity.getTypeName(), bucketEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); + } + + context.putEntity(bucketQualifiedName, bucketEntity); + } + + extInfo.addReferredEntity(bucketEntity); + + AtlasRelatedObjectId parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(bucketEntity, RELATIONSHIP_GCS_PARENT_CHILDREN); + String parentPath = Path.SEPARATOR; + String dirPath = path.toUri().getPath(); + + if (StringUtils.isEmpty(dirPath)) { + dirPath = Path.SEPARATOR; + } + + for (String subDirName : dirPath.split(Path.SEPARATOR)) { + if (StringUtils.isEmpty(subDirName)) { + continue; + } + + String subDirPath = parentPath + subDirName + Path.SEPARATOR; + String subDirQualifiedName = schemeAndBucketName + subDirPath + QNAME_SEP_METADATA_NAMESPACE + metadataNamespace; + + ret = new AtlasEntity(GCS_VIRTUAL_DIR); + + ret.setRelationshipAttribute(ATTRIBUTE_GCS_PARENT, parentObjId); + ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, parentPath); + ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, subDirQualifiedName); + ret.setAttribute(ATTRIBUTE_NAME, subDirName); + + if (LOG.isDebugEnabled()) { + LOG.debug("adding entity: typeName={}, qualifiedName={}", ret.getTypeName(), ret.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); + } + + context.putEntity(subDirQualifiedName, ret); + + parentObjId = AtlasTypeUtil.getAtlasRelatedObjectId(ret, RELATIONSHIP_GCS_PARENT_CHILDREN); + parentPath = subDirPath; + } + + if (ret == null) { + ret = bucketEntity; + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("<== addGCSPathEntity(strPath={})", strPath); + } + + return ret; + } + private static AtlasEntity addHDFSPathEntity(Path path, PathExtractorContext context) { String strPath = path.toString(); diff --git a/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java b/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java index dbc5000..6bf5d57 100644 --- a/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java +++ b/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java @@ -78,6 +78,12 @@ public class AtlasPathExtractorUtilTest { private static final String S3_PATH = S3_SCHEME + "aws_my_bucket1/1234567890/renders/Irradiance_A.csv"; private static final String S3A_PATH = S3A_SCHEME + "aws_my_bucket1/1234567890/renders/Irradiance_A.csv"; + // Google Cloud Storage + private static final String GCS_VIRTUAL_DIR = "gcp_storage_virtual_directory"; + private static final String GCS_BUCKET = "gcp_storage_bucket"; + private static final String GCS_SCHEME = "gs" + SCHEME_SEPARATOR; + private static final String GCS_PATH = GCS_SCHEME + "gcs_test_bucket1/1234567890/data"; + @DataProvider(name = "ozonePathProvider") private Object[][] ozonePathProvider(){ return new Object[][]{ @@ -264,6 +270,22 @@ public class AtlasPathExtractorUtilTest { verifyS3KnownEntities(S3A_SCHEME, S3A_PATH, extractorContext.getKnownEntities()); } + @Test + public void testGetPathEntityGCSPath() { + PathExtractorContext extractorContext = new PathExtractorContext(METADATA_NAMESPACE); + + Path path = new Path(GCS_PATH); + AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, extractorContext); + AtlasEntity entity = entityWithExtInfo.getEntity(); + + assertNotNull(entity); + assertEquals(entity.getTypeName(), GCS_VIRTUAL_DIR); + assertEquals(entityWithExtInfo.getReferredEntities().size(), 1); + + verifyGCSVirtualDir(GCS_SCHEME, GCS_PATH, entity); + verifyGCSKnownEntities(GCS_SCHEME, GCS_PATH, extractorContext.getKnownEntities()); + } + private void verifyOzoneEntities(Map<String, AtlasEntity> knownEntities, OzoneKeyValidator validator) { for (AtlasEntity knownEntity : knownEntities.values()) { switch (knownEntity.getTypeName()){ @@ -350,20 +372,35 @@ public class AtlasPathExtractorUtilTest { if (pathQName.equalsIgnoreCase(entityQName)){ assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "Irradiance_A.csv"); - assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/renders/Irradiance_A.csv/"); + assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/renders/"); } else { pathQName = s3Scheme + "aws_my_bucket1/1234567890/" + QNAME_METADATA_NAMESPACE; if (pathQName.equalsIgnoreCase(entityQName)){ assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "1234567890"); - assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/"); + assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/"); } else { assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), s3Scheme + "aws_my_bucket1/1234567890/renders/" + QNAME_METADATA_NAMESPACE); assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "renders"); - assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/renders/"); + assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/"); } } } + private void verifyGCSVirtualDir(String s3Scheme, String path, AtlasEntity entity) { + String pathQName = path + "/" + QNAME_METADATA_NAMESPACE; + String entityQName = (String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME); + + if (pathQName.equalsIgnoreCase(entityQName)){ + assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "data"); + assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/"); + } else { + pathQName = s3Scheme + "gcs_test_bucket1/1234567890/" + QNAME_METADATA_NAMESPACE; + assertEquals(entityQName, pathQName); + assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "1234567890"); + assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/"); + } + } + private void verifyS3V2KnownEntities(String scheme, String path, Map<String, AtlasEntity> knownEntities) { assertEquals(knownEntities.size(), 4); int dirCount = 0; @@ -411,6 +448,29 @@ public class AtlasPathExtractorUtilTest { assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "aws_my_bucket1"); } + private void verifyGCSKnownEntities(String scheme, String path, Map<String, AtlasEntity> knownEntities) { + assertEquals(knownEntities.size(), 3); + int dirCount = 0; + for (AtlasEntity knownEntity : knownEntities.values()) { + switch (knownEntity.getTypeName()){ + case GCS_VIRTUAL_DIR: + verifyGCSVirtualDir(scheme, path, knownEntity); + dirCount++; + break; + + case GCS_BUCKET: + verifyGCSBucketEntity(scheme, knownEntity); + break; + } + } + assertEquals(dirCount, 2); + } + + private void verifyGCSBucketEntity(String scheme, AtlasEntity entity) { + assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), scheme + "gcs_test_bucket1" + QNAME_METADATA_NAMESPACE); + assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "gcs_test_bucket1"); + } + private class OzoneKeyValidator { private final String scheme; private final String location;