This is an automated email from the ASF dual-hosted git repository. sidmishra pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new aa74830 ATLAS-4400: Fixed Hook and Atlas Preprocessor to handle S3 V2 directory objectPrefix Issue with Atlas Server and Hook versions mismatch aa74830 is described below commit aa74830238d7544d16468393987a17338ca96c6b Author: Sidharth Mishra <sidharthkmis...@gmail.com> AuthorDate: Wed Aug 25 20:36:49 2021 -0700 ATLAS-4400: Fixed Hook and Atlas Preprocessor to handle S3 V2 directory objectPrefix Issue with Atlas Server and Hook versions mismatch Signed-off-by: sidmishra <sidmis...@apache.org> (cherry picked from commit d7547c493f25f3e1acc5572ce7159068d6b2d00e) --- .../apache/atlas/utils/AtlasPathExtractorUtil.java | 2 +- .../atlas/utils/AtlasPathExtractorUtilTest.java | 6 +- .../notification/NotificationHookConsumer.java | 35 +++- .../preprocessor/AWSS3V2Preprocessor.java | 91 +++++++++ .../preprocessor/EntityPreprocessor.java | 17 +- .../preprocessor/PreprocessorContext.java | 8 +- .../preprocessor/AWSS3V2PreprocessorTest.java | 214 +++++++++++++++++++++ 7 files changed, 364 insertions(+), 9 deletions(-) diff --git a/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java b/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java index a9f2e50..01a67b7 100644 --- a/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java +++ b/common/src/main/java/org/apache/atlas/utils/AtlasPathExtractorUtil.java @@ -231,7 +231,7 @@ public class AtlasPathExtractorUtil { ret = new AtlasEntity(AWS_S3_V2_PSEUDO_DIR); ret.setRelationshipAttribute(ATTRIBUTE_CONTAINER, parentObjId); - ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, parentPath); + ret.setAttribute(ATTRIBUTE_OBJECT_PREFIX, subDirPath); ret.setAttribute(ATTRIBUTE_QUALIFIED_NAME, subDirQualifiedName); ret.setAttribute(ATTRIBUTE_NAME, subDirName); diff --git a/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java b/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java index 6bf5d57..f35e9ae 100644 --- a/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java +++ b/common/src/test/java/org/apache/atlas/utils/AtlasPathExtractorUtilTest.java @@ -372,16 +372,16 @@ public class AtlasPathExtractorUtilTest { if (pathQName.equalsIgnoreCase(entityQName)){ assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "Irradiance_A.csv"); - assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/renders/"); + assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/renders/Irradiance_A.csv/"); } else { pathQName = s3Scheme + "aws_my_bucket1/1234567890/" + QNAME_METADATA_NAMESPACE; if (pathQName.equalsIgnoreCase(entityQName)){ assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "1234567890"); - assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/"); + assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/"); } else { assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), s3Scheme + "aws_my_bucket1/1234567890/renders/" + QNAME_METADATA_NAMESPACE); assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "renders"); - assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/"); + assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/renders/"); } } } diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 5643af9..49c504f 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -154,6 +154,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl public static final String CONSUMER_PREPROCESS_HIVE_PROCESS_UPD_NAME_WITH_QUALIFIED_NAME = "atlas.notification.consumer.preprocess.hive_process.update.name.with.qualified_name"; public static final String CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.hive_types.remove.ownedref.attrs"; public static final String CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS = "atlas.notification.consumer.preprocess.rdbms_types.remove.ownedref.attrs"; + public static final String CONSUMER_PREPROCESS_S3_V2_DIRECTORY_PRUNE_OBJECT_PREFIX = "atlas.notification.consumer.preprocess.s3_v2_directory.prune.object_prefix"; public static final String CONSUMER_AUTHORIZE_USING_MESSAGE_USER = "atlas.notification.authorize.using.message.user"; public static final String CONSUMER_AUTHORIZE_AUTHN_CACHE_TTL_SECONDS = "atlas.notification.authorize.authn.cache.ttl.seconds"; @@ -182,6 +183,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private final Map<String, PreprocessAction> hiveTablesCache; private final boolean hiveTypesRemoveOwnedRefAttrs; private final boolean rdbmsTypesRemoveOwnedRefAttrs; + private final boolean s3V2DirectoryPruneObjectPrefix; private final boolean preprocessEnabled; private final boolean createShellEntityForNonExistingReference; private final boolean authorizeUsingMessageUser; @@ -310,12 +312,15 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl hiveTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, true); rdbmsTypesRemoveOwnedRefAttrs = applicationProperties.getBoolean(CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, true); - preprocessEnabled = skipHiveColumnLineageHive20633 || updateHiveProcessNameWithQualifiedName || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty(); + s3V2DirectoryPruneObjectPrefix = applicationProperties.getBoolean(CONSUMER_PREPROCESS_S3_V2_DIRECTORY_PRUNE_OBJECT_PREFIX, true); + + preprocessEnabled = skipHiveColumnLineageHive20633 || updateHiveProcessNameWithQualifiedName || hiveTypesRemoveOwnedRefAttrs || rdbmsTypesRemoveOwnedRefAttrs || s3V2DirectoryPruneObjectPrefix || !hiveTablesToIgnore.isEmpty() || !hiveTablesToPrune.isEmpty() || !hiveDummyDatabasesToIgnore.isEmpty() || !hiveDummyTablesToIgnore.isEmpty() || !hiveTablePrefixesToIgnore.isEmpty(); entityCorrelationManager = new EntityCorrelationManager(entityCorrelationStore); LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, skipHiveColumnLineageHive20633); LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, skipHiveColumnLineageHive20633InputsThreshold); LOG.info("{}={}", CONSUMER_PREPROCESS_HIVE_TYPES_REMOVE_OWNEDREF_ATTRS, hiveTypesRemoveOwnedRefAttrs); LOG.info("{}={}", CONSUMER_PREPROCESS_RDBMS_TYPES_REMOVE_OWNEDREF_ATTRS, rdbmsTypesRemoveOwnedRefAttrs); + LOG.info("{}={}", CONSUMER_PREPROCESS_S3_V2_DIRECTORY_PRUNE_OBJECT_PREFIX, s3V2DirectoryPruneObjectPrefix); LOG.info("{}={}", CONSUMER_COMMIT_BATCH_SIZE, commitBatchSize); LOG.info("{}={}", CONSUMER_DISABLED, consumerDisabled); } @@ -982,7 +987,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl if (preprocessEnabled) { context = new PreprocessorContext(kafkaMsg, typeRegistry, hiveTablesToIgnore, hiveTablesToPrune, hiveTablesCache, hiveDummyDatabasesToIgnore, hiveDummyTablesToIgnore, hiveTablePrefixesToIgnore, hiveTypesRemoveOwnedRefAttrs, - rdbmsTypesRemoveOwnedRefAttrs, updateHiveProcessNameWithQualifiedName, entityCorrelationManager); + rdbmsTypesRemoveOwnedRefAttrs, s3V2DirectoryPruneObjectPrefix, updateHiveProcessNameWithQualifiedName, entityCorrelationManager); if (context.isHivePreprocessEnabled()) { preprocessHiveTypes(context); @@ -996,6 +1001,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl rdbmsTypeRemoveOwnedRefAttrs(context); } + if (s3V2DirectoryPruneObjectPrefix) { + pruneObjectPrefixForS3V2Directory(context); + } + context.moveRegisteredReferredEntities(); if (context.isHivePreprocessEnabled() && CollectionUtils.isNotEmpty(context.getEntities()) && context.getEntities().size() > 1) { @@ -1040,6 +1049,28 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } } + private void pruneObjectPrefixForS3V2Directory(PreprocessorContext context) { + List<AtlasEntity> entities = new ArrayList<>(); + + if (CollectionUtils.isNotEmpty(context.getEntities())) { + entities.addAll(context.getEntities()); + } + + if (MapUtils.isNotEmpty(context.getReferredEntities())) { + entities.addAll(context.getReferredEntities().values()); + } + + if (CollectionUtils.isNotEmpty(entities)) { + for (AtlasEntity entity : entities) { + EntityPreprocessor preprocessor = EntityPreprocessor.getS3V2Preprocessor(entity.getTypeName()); + + if (preprocessor != null) { + preprocessor.preprocess(entity, context); + } + } + } + } + private void preprocessHiveTypes(PreprocessorContext context) { List<AtlasEntity> entities = context.getEntities(); diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/AWSS3V2Preprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/AWSS3V2Preprocessor.java new file mode 100644 index 0000000..6102572 --- /dev/null +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/AWSS3V2Preprocessor.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification.preprocessor; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.hadoop.fs.Path; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AWSS3V2Preprocessor { + private static final Logger LOG = LoggerFactory.getLogger(AWSS3V2Preprocessor.class); + + private static final String AWS_S3_V2_DIR_TYPE = "aws_s3_v2_directory"; + private static final String ATTRIBUTE_OBJECT_PREFIX = "objectPrefix"; + private static final String SCHEME_SEPARATOR = "://"; + + static class AWSS3V2DirectoryPreprocessor extends EntityPreprocessor { + protected AWSS3V2DirectoryPreprocessor() { + super(AWS_S3_V2_DIR_TYPE); + } + + @Override + public void preprocess(AtlasEntity entity, PreprocessorContext context) { + if (context.getS3V2DirectoryPruneObjectPrefix()) { + String qualifiedName = (String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME); + String objectPrefix = (String) entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX); + + if (isObjectPrefixPruneNeeded(qualifiedName, objectPrefix)) { + if (objectPrefix.lastIndexOf(Path.SEPARATOR) == -1) { + objectPrefix = Path.SEPARATOR; + } else { + if (doesEndsWithPathSeparator(objectPrefix)) { + objectPrefix = removeLastPathSeparator(objectPrefix); + } + + objectPrefix = objectPrefix.substring(0, objectPrefix.lastIndexOf(Path.SEPARATOR) + 1); + } + + LOG.info("Aws S3 V2 Preprocessor: Pruning {} from {} to {}", ATTRIBUTE_OBJECT_PREFIX + QNAME_SEP_CLUSTER_NAME + AWS_S3_V2_DIR_TYPE, + entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), objectPrefix); + + entity.setAttribute(ATTRIBUTE_OBJECT_PREFIX, objectPrefix); + } + } + } + + private boolean isObjectPrefixPruneNeeded(String qualifiedName, String objectPrefix) { + return (StringUtils.isNotBlank(qualifiedName) + && StringUtils.isNotBlank(objectPrefix) + && qualifiedName.contains(getSchemeAndBucket(qualifiedName) + objectPrefix + QNAME_SEP_CLUSTER_NAME)); + } + + private String getSchemeAndBucket(String qualifiedName) { + String ret = ""; + + if (StringUtils.isNotEmpty(qualifiedName) && qualifiedName.contains(SCHEME_SEPARATOR)) { + int schemeSeparatorEndPosition = qualifiedName.indexOf(SCHEME_SEPARATOR) + SCHEME_SEPARATOR.length(); + int bucketEndPosition = qualifiedName.indexOf(Path.SEPARATOR, schemeSeparatorEndPosition); + ret = qualifiedName.substring(0, bucketEndPosition); + } + + return ret; + } + + private boolean doesEndsWithPathSeparator(String path) { + return path.endsWith(Path.SEPARATOR); + } + + private String removeLastPathSeparator(String path) { + return StringUtils.chop(path); + } + } + +} diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java index 7f0cafe..f8eac4c 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/EntityPreprocessor.java @@ -64,8 +64,9 @@ public abstract class EntityPreprocessor { public static final char QNAME_SEP_ENTITY_NAME = '.'; public static final String QNAME_SD_SUFFIX = "_storage"; - private static final Map<String, EntityPreprocessor> HIVE_PREPROCESSOR_MAP = new HashMap<>(); - private static final Map<String, EntityPreprocessor> RDBMS_PREPROCESSOR_MAP = new HashMap<>(); + private static final Map<String, EntityPreprocessor> HIVE_PREPROCESSOR_MAP = new HashMap<>(); + private static final Map<String, EntityPreprocessor> RDBMS_PREPROCESSOR_MAP = new HashMap<>(); + private static final Map<String, EntityPreprocessor> AWS_S3_V2_PREPROCESSOR_MAP = new HashMap<>(); private final String typeName; @@ -88,6 +89,10 @@ public abstract class EntityPreprocessor { new RdbmsPreprocessor.RdbmsTablePreprocessor() }; + EntityPreprocessor[] s3V2Preprocessors = new EntityPreprocessor[] { + new AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor() + }; + for (EntityPreprocessor preprocessor : hivePreprocessors) { HIVE_PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor); } @@ -95,6 +100,10 @@ public abstract class EntityPreprocessor { for (EntityPreprocessor preprocessor : rdbmsPreprocessors) { RDBMS_PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor); } + + for (EntityPreprocessor preprocessor : s3V2Preprocessors) { + AWS_S3_V2_PREPROCESSOR_MAP.put(preprocessor.getTypeName(), preprocessor); + } } protected EntityPreprocessor(String typeName) { @@ -116,6 +125,10 @@ public abstract class EntityPreprocessor { return typeName != null ? RDBMS_PREPROCESSOR_MAP.get(typeName) : null; } + public static EntityPreprocessor getS3V2Preprocessor(String typeName) { + return typeName != null ? AWS_S3_V2_PREPROCESSOR_MAP.get(typeName) : null; + } + public static String getQualifiedName(AtlasEntity entity) { Object obj = entity != null ? entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME) : null; diff --git a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java index 59f6440..f930d9f 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java +++ b/webapp/src/main/java/org/apache/atlas/notification/preprocessor/PreprocessorContext.java @@ -63,6 +63,7 @@ public class PreprocessorContext { private final boolean updateHiveProcessNameWithQualifiedName; private final boolean hiveTypesRemoveOwnedRefAttrs; private final boolean rdbmsTypesRemoveOwnedRefAttrs; + private final boolean s3V2DirectoryPruneObjectPrefix; private final boolean isHivePreProcessEnabled; private final Set<String> ignoredEntities = new HashSet<>(); private final Set<String> prunedEntities = new HashSet<>(); @@ -73,7 +74,7 @@ public class PreprocessorContext { private final EntityCorrelationManager correlationManager; private List<AtlasEntity> postUpdateEntities = null; - public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, List<String> hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore, List<String> hiveTablePrefixesToIgnore, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs, boolean updateHiveProcessNameWithQualifiedName, EntityCorrelationMana [...] + public PreprocessorContext(AtlasKafkaMessage<HookNotification> kafkaMessage, AtlasTypeRegistry typeRegistry, List<Pattern> hiveTablesToIgnore, List<Pattern> hiveTablesToPrune, Map<String, PreprocessAction> hiveTablesCache, List<String> hiveDummyDatabasesToIgnore, List<String> hiveDummyTablesToIgnore, List<String> hiveTablePrefixesToIgnore, boolean hiveTypesRemoveOwnedRefAttrs, boolean rdbmsTypesRemoveOwnedRefAttrs, boolean s3V2DirectoryPruneObjectPrefix, boolean updateHiveProcessName [...] this.kafkaMessage = kafkaMessage; this.typeRegistry = typeRegistry; this.hiveTablesToIgnore = hiveTablesToIgnore; @@ -84,6 +85,7 @@ public class PreprocessorContext { this.hiveTablePrefixesToIgnore = hiveTablePrefixesToIgnore; this.hiveTypesRemoveOwnedRefAttrs = hiveTypesRemoveOwnedRefAttrs; this.rdbmsTypesRemoveOwnedRefAttrs = rdbmsTypesRemoveOwnedRefAttrs; + this.s3V2DirectoryPruneObjectPrefix = s3V2DirectoryPruneObjectPrefix; this.updateHiveProcessNameWithQualifiedName = updateHiveProcessNameWithQualifiedName; final HookNotification message = kafkaMessage.getMessage(); @@ -124,6 +126,10 @@ public class PreprocessorContext { public boolean getRdbmsTypesRemoveOwnedRefAttrs() { return rdbmsTypesRemoveOwnedRefAttrs; } + public boolean getS3V2DirectoryPruneObjectPrefix() { + return s3V2DirectoryPruneObjectPrefix; + } + public boolean isHivePreprocessEnabled() { return isHivePreProcessEnabled; } diff --git a/webapp/src/test/java/org/apache/atlas/notification/preprocessor/AWSS3V2PreprocessorTest.java b/webapp/src/test/java/org/apache/atlas/notification/preprocessor/AWSS3V2PreprocessorTest.java new file mode 100644 index 0000000..9c6c92a --- /dev/null +++ b/webapp/src/test/java/org/apache/atlas/notification/preprocessor/AWSS3V2PreprocessorTest.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.notification.preprocessor; + +import org.apache.atlas.kafka.AtlasKafkaMessage; +import org.apache.atlas.kafka.KafkaNotification; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; +import org.apache.atlas.model.notification.HookNotification; +import org.apache.atlas.utils.AtlasPathExtractorUtil; +import org.apache.atlas.utils.PathExtractorContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.Test; +import org.apache.hadoop.fs.Path; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; + +public class AWSS3V2PreprocessorTest { + + private static final Logger LOG = LoggerFactory.getLogger(AWSS3V2PreprocessorTest.class); + private static final String METADATA_NAMESPACE = "cm"; + private static final String QNAME_METADATA_NAMESPACE = '@' + METADATA_NAMESPACE; + private static final String AWS_S3_MODEL_VERSION_V2 = "V2"; + + private static final String SCHEME_SEPARATOR = "://"; + private static final String S3_SCHEME = "s3" + SCHEME_SEPARATOR; + private static final String S3A_SCHEME = "s3a" + SCHEME_SEPARATOR; + private static final String ABFS_SCHEME = "abfs" + SCHEME_SEPARATOR; + + private static final String ATTRIBUTE_NAME = "name"; + private static final String ATTRIBUTE_OBJECT_PREFIX = "objectPrefix"; + private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; + + private static final List<String> EMPTY_STR_LIST = new ArrayList<>(); + private static final List<Pattern> EMPTY_PATTERN_LIST = new ArrayList<>(); + + @Test + public void testS2V2DirectoryPreprocessorForOtherTypes() { + PathExtractorContext extractorContext = new PathExtractorContext(METADATA_NAMESPACE); + final String ABFS_PATH = ABFS_SCHEME + "d...@razrangersan.dfs.core.windows.net/tmp/cdp-demo/sample.csv"; + Path path = new Path(ABFS_PATH); + AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, extractorContext); + AtlasEntity entity = entityWithExtInfo.getEntity(); + + assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), null); + + HookNotification hookNotification = new HookNotification.EntityCreateRequestV2("test", new AtlasEntity.AtlasEntitiesWithExtInfo(entity)); + AtlasKafkaMessage<HookNotification> kafkaMsg = new AtlasKafkaMessage(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1); + PreprocessorContext context = new PreprocessorContext(kafkaMsg, null, EMPTY_PATTERN_LIST, EMPTY_PATTERN_LIST, null, + EMPTY_STR_LIST, EMPTY_STR_LIST, EMPTY_STR_LIST, false, + false, true, false, null); + + EntityPreprocessor preprocessor = new AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor(); + + preprocessor.preprocess(entity, context); + + assertNotEquals(entity.getTypeName(), preprocessor.getTypeName()); + } + + @Test + public void testS2V2DirectoryPreprocessorForFullPath() { + PathExtractorContext extractorContext = new PathExtractorContext(METADATA_NAMESPACE, AWS_S3_MODEL_VERSION_V2); + final String S3_PATH = S3A_SCHEME + "aws_bucket1/1234567890/test/data1"; + Path path = new Path(S3_PATH); + AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, extractorContext); + AtlasEntity entity = entityWithExtInfo.getEntity(); + + assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/test/data1/"); + + HookNotification hookNotification = new HookNotification.EntityCreateRequestV2("test", new AtlasEntity.AtlasEntitiesWithExtInfo(entity)); + AtlasKafkaMessage<HookNotification> kafkaMsg = new AtlasKafkaMessage(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1); + PreprocessorContext context = new PreprocessorContext(kafkaMsg, null, EMPTY_PATTERN_LIST, EMPTY_PATTERN_LIST, null, + EMPTY_STR_LIST, EMPTY_STR_LIST, EMPTY_STR_LIST, false, + false, true, false, null); + + EntityPreprocessor preprocessor = new AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor(); + + preprocessor.preprocess(entity, context); + + assertEquals(entity.getTypeName(), preprocessor.getTypeName()); + assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/1234567890/test/"); + assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), S3A_SCHEME + "aws_bucket1/1234567890/test/data1/" + QNAME_METADATA_NAMESPACE); + assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "data1"); + } + + @Test + public void testS2V2DirectoryPreprocessorForRootLevelDirectory() { + PathExtractorContext extractorContext = new PathExtractorContext(METADATA_NAMESPACE, AWS_S3_MODEL_VERSION_V2); + final String S3_PATH = S3A_SCHEME + "aws_bucket1/root1"; + Path path = new Path(S3_PATH); + AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, extractorContext); + AtlasEntity entity = entityWithExtInfo.getEntity(); + + assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/root1/"); + + HookNotification hookNotification = new HookNotification.EntityCreateRequestV2("test", new AtlasEntity.AtlasEntitiesWithExtInfo(entity)); + AtlasKafkaMessage<HookNotification> kafkaMsg = new AtlasKafkaMessage(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1); + PreprocessorContext context = new PreprocessorContext(kafkaMsg, null, EMPTY_PATTERN_LIST, EMPTY_PATTERN_LIST, null, + EMPTY_STR_LIST, EMPTY_STR_LIST, EMPTY_STR_LIST, false, + false, true, false, null); + + EntityPreprocessor preprocessor = new AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor(); + + preprocessor.preprocess(entity, context); + + assertEquals(entity.getTypeName(), preprocessor.getTypeName()); + assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/"); + assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), S3A_SCHEME + "aws_bucket1/root1/" + QNAME_METADATA_NAMESPACE); + assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "root1"); + } + + @Test + public void testS2V2DirectoryPreprocessorWithSameDirNames() { + PathExtractorContext extractorContext = new PathExtractorContext(METADATA_NAMESPACE, AWS_S3_MODEL_VERSION_V2); + final String S3_PATH = S3_SCHEME + "temp/temp/temp/temp/"; + Path path = new Path(S3_PATH); + AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, extractorContext); + AtlasEntity entity = entityWithExtInfo.getEntity(); + + assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/temp/temp/temp/"); + + HookNotification hookNotification = new HookNotification.EntityCreateRequestV2("test", new AtlasEntity.AtlasEntitiesWithExtInfo(entity)); + AtlasKafkaMessage<HookNotification> kafkaMsg = new AtlasKafkaMessage(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1); + PreprocessorContext context = new PreprocessorContext(kafkaMsg, null, EMPTY_PATTERN_LIST, EMPTY_PATTERN_LIST, null, + EMPTY_STR_LIST, EMPTY_STR_LIST, EMPTY_STR_LIST, false, + false, true, false, null); + + EntityPreprocessor preprocessor = new AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor(); + + preprocessor.preprocess(entity, context); + + assertEquals(entity.getTypeName(), preprocessor.getTypeName()); + assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/temp/temp/"); + assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), S3_SCHEME + "temp/temp/temp/temp/" + QNAME_METADATA_NAMESPACE); + assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "temp"); + } + + @Test + public void testS2V2DirectoryPreprocessorForHookWithCorrectObjectPrefix() { + PathExtractorContext extractorContext = new PathExtractorContext(METADATA_NAMESPACE, AWS_S3_MODEL_VERSION_V2); + final String S3_PATH = S3A_SCHEME + "aws_bucket1/root1"; + Path path = new Path(S3_PATH); + AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, extractorContext); + AtlasEntity entity = entityWithExtInfo.getEntity(); + + entity.setAttribute(ATTRIBUTE_OBJECT_PREFIX, "/"); + assertNotEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/root1/"); + + HookNotification hookNotification = new HookNotification.EntityCreateRequestV2("test", new AtlasEntity.AtlasEntitiesWithExtInfo(entity)); + AtlasKafkaMessage<HookNotification> kafkaMsg = new AtlasKafkaMessage(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1); + PreprocessorContext context = new PreprocessorContext(kafkaMsg, null, EMPTY_PATTERN_LIST, EMPTY_PATTERN_LIST, null, + EMPTY_STR_LIST, EMPTY_STR_LIST, EMPTY_STR_LIST, false, + false, true, false, null); + + EntityPreprocessor preprocessor = new AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor(); + + preprocessor.preprocess(entity, context); + + assertEquals(entity.getTypeName(), preprocessor.getTypeName()); + assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/"); + assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), S3A_SCHEME + "aws_bucket1/root1/" + QNAME_METADATA_NAMESPACE); + assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "root1"); + } + + @Test + public void testS2V2DirectoryPreprocessorForHookWithCorrectObjectPrefixHavingSameDirNames() { + PathExtractorContext extractorContext = new PathExtractorContext(METADATA_NAMESPACE, AWS_S3_MODEL_VERSION_V2); + final String S3_PATH = S3A_SCHEME + "temp/temp/temp/temp/"; + Path path = new Path(S3_PATH); + AtlasEntityWithExtInfo entityWithExtInfo = AtlasPathExtractorUtil.getPathEntity(path, extractorContext); + AtlasEntity entity = entityWithExtInfo.getEntity(); + + assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/temp/temp/temp/"); + entity.setAttribute(ATTRIBUTE_OBJECT_PREFIX, "/temp/temp/"); + assertNotEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/temp/temp/temp/"); + + HookNotification hookNotification = new HookNotification.EntityCreateRequestV2("test", new AtlasEntity.AtlasEntitiesWithExtInfo(entity)); + AtlasKafkaMessage<HookNotification> kafkaMsg = new AtlasKafkaMessage(hookNotification, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1); + PreprocessorContext context = new PreprocessorContext(kafkaMsg, null, EMPTY_PATTERN_LIST, EMPTY_PATTERN_LIST, null, + EMPTY_STR_LIST, EMPTY_STR_LIST, EMPTY_STR_LIST, false, + false, true, false, null); + + EntityPreprocessor preprocessor = new AWSS3V2Preprocessor.AWSS3V2DirectoryPreprocessor(); + + preprocessor.preprocess(entity, context); + + assertEquals(entity.getTypeName(), preprocessor.getTypeName()); + assertEquals(entity.getAttribute(ATTRIBUTE_OBJECT_PREFIX), "/temp/temp/"); + assertEquals(entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME), S3A_SCHEME + "temp/temp/temp/temp/" + QNAME_METADATA_NAMESPACE); + assertEquals(entity.getAttribute(ATTRIBUTE_NAME), "temp"); + } +}