This is an automated email from the ASF dual-hosted git repository. pinal pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 3aebe0cb1 ATLAS-4842 : Export/Import: fetchType as incremental does full export instead of connected 3aebe0cb1 is described below commit 3aebe0cb1e1937a58f144b16b6616fe24e55f7eb Author: priyanshi-shah26 <priyanshi.s...@freestoneinfotech.com> AuthorDate: Fri Apr 5 14:06:55 2024 +0530 ATLAS-4842 : Export/Import: fetchType as incremental does full export instead of connected Signed-off-by: Pinal Shah <pinal.s...@freestoneinfotech.com> --- .../atlas/repository/impexp/EntitiesExtractor.java | 8 ++- .../atlas/repository/impexp/ExportService.java | 13 ++-- .../repository/impexp/ExportIncrementalTest.java | 73 +++++++++++++++++++++- .../typedef-new-classification-T3.json | 23 +++++++ .../typesdef-new-classification-T2.json | 23 +++++++ 5 files changed, 131 insertions(+), 9 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java b/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java index da5cf37c4..4f603cb8b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java @@ -55,8 +55,12 @@ public class EntitiesExtractor { if (context.isHiveDBIncrementalSkipLineage()) { extractors.get(INCREMENTAL_EXTRACT).fullFetch(entity, context); break; - } else if (context.isHiveTableIncrementalSkipLineage()) { - extractors.get(INCREMENTAL_EXTRACT).connectedFetch(entity, context); + } else if (context.isHiveTableIncremental()) { + if (context.skipLineage) { + extractors.get(INCREMENTAL_EXTRACT).connectedFetch(entity, context); + } else { + extractor.connectedFetch(entity, context); + } break; } diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java index 65d7a1872..4615c6c2f 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java @@ -408,7 +408,7 @@ public class ExportService { skipLineage = result.getRequest().getSkipLineageOptionValue(); this.changeMarker = result.getRequest().getChangeTokenFromOptions(); this.isHiveDBIncremental = checkHiveDBIncrementalSkipLineage(result.getRequest()); - this.isHiveTableIncremental = checkHiveTableIncrementalSkipLineage(result.getRequest()); + this.isHiveTableIncremental = checkHiveTableIncremental(result.getRequest()); this.isSkipConnectedFetch = false; } @@ -422,14 +422,13 @@ public class ExportService { request.getSkipLineageOptionValue(); } - private boolean checkHiveTableIncrementalSkipLineage(AtlasExportRequest request) { - if(CollectionUtils.isEmpty(request.getItemsToExport())) { + private boolean checkHiveTableIncremental(AtlasExportRequest request) { + if (CollectionUtils.isEmpty(request.getItemsToExport())) { return false; } return request.getItemsToExport().get(0).getTypeName().equalsIgnoreCase(ATLAS_TYPE_HIVE_TABLE) && - request.getFetchTypeOptionValue().equalsIgnoreCase(AtlasExportRequest.FETCH_TYPE_INCREMENTAL) && - request.getSkipLineageOptionValue(); + request.getFetchTypeOptionValue().equalsIgnoreCase(AtlasExportRequest.FETCH_TYPE_INCREMENTAL); } public List<AtlasEntity> getEntitiesWithModifiedTimestamp(AtlasEntityWithExtInfo entityWithExtInfo) { @@ -501,6 +500,10 @@ public class ExportService { return isHiveTableIncremental; } + public boolean isHiveTableIncremental() { + return isHiveTableIncremental; + } + public void addToEntityCreationOrder(String guid) { entityCreationOrder.add(guid); } diff --git a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java index 0e3955dcd..bbdab3b63 100644 --- a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java @@ -62,6 +62,7 @@ import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImp import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.assertFalse; @Guice(modules = TestModules.TestOnlyModule.class) public class ExportIncrementalTest extends AtlasTestBase { @@ -83,22 +84,27 @@ public class ExportIncrementalTest extends AtlasTestBase { private final String EXPORT_REQUEST_INCREMENTAL = "export-incremental"; private final String EXPORT_REQUEST_CONNECTED = "export-connected"; private AtlasClassificationType classificationTypeT1; + private AtlasClassificationType classificationTypeT2; + private AtlasClassificationType classificationTypeT3; private long nextTimestamp; private static final String EXPORT_INCREMENTAL = "incremental"; private static final String QUALIFIED_NAME_DB = "db_test_1@02052019"; private static final String QUALIFIED_NAME_TABLE_LINEAGE = "db_test_1.test_tbl_ctas_2@02052019"; - - + private static final String QUALIFIED_NAME_TABLE_2 = "db_test_1.test_tbl_2@02052019"; private static final String GUID_DB = "f0b72ab4-7452-4e42-ac74-2aee7728cce4"; private static final String GUID_TABLE_2 = "8d0b834c-61ce-42d8-8f66-6fa51c36bccb"; private static final String GUID_TABLE_CTAS_2 = "eaec545b-3ac7-4e1b-a497-bd4a2b6434a2"; + private static final String GUID_HIVE_PROCESS = "bd3138b2-f29e-4226-b859-de25eaa1c18b"; + private static final String GUID_TABLE_1 = "4d5adf00-2c9b-4877-ad23-c41fd7319150"; @BeforeClass public void setup() throws IOException, AtlasBaseException { basicSetup(typeDefStore, typeRegistry); RequestContext.get().setImportInProgress(true); classificationTypeT1 = createNewClassification(); + classificationTypeT2 = createNewClassificationT2(); + classificationTypeT3 = createNewClassificationT3(); createEntities(entityStore, ENTITIES_SUB_DIR, new String[] { "db", "table-columns"}); final String[] entityGuids = {DB_GUID, TABLE_GUID}; @@ -163,6 +169,15 @@ public class ExportIncrementalTest extends AtlasTestBase { return typeRegistry.getClassificationTypeByName("T1"); } + private AtlasClassificationType createNewClassificationT2() { + createTypes(typeDefStore, ENTITIES_SUB_DIR,"typesdef-new-classification-T2"); + return typeRegistry.getClassificationTypeByName("T2"); + } + private AtlasClassificationType createNewClassificationT3() { + createTypes(typeDefStore, ENTITIES_SUB_DIR,"typedef-new-classification-T3"); + return typeRegistry.getClassificationTypeByName("T3"); + } + @Test(dependsOnMethods = "atT1_NewClassificationAttachedToTable_ReturnsChangedTable") public void atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn() throws AtlasBaseException, IOException { final int expectedEntityCount = 1; @@ -245,6 +260,54 @@ public class ExportIncrementalTest extends AtlasTestBase { verifyExpectedEntities(getFileNames(getZipSourceCopy(source)), GUID_TABLE_CTAS_2); } + @Test(dependsOnMethods = "importHiveDb") + public void exportTableIncrementalForParentEntity() throws AtlasBaseException, IOException { + InputStream source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_2, EXPORT_INCREMENTAL, 0, false)); + + ZipSource sourceCopy = getZipSourceCopy(source); + verifyExpectedEntities(getFileNames(sourceCopy), GUID_DB, GUID_HIVE_PROCESS, GUID_TABLE_2, GUID_TABLE_CTAS_2); + + source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_2, EXPORT_INCREMENTAL, 0, false)); + verifyUnExpectedEntities(getFileNames(getZipSourceCopy(source)), GUID_TABLE_1); + + nextTimestamp = updateTimesampForNextIncrementalExport(sourceCopy); + + try { + source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_2, EXPORT_INCREMENTAL, nextTimestamp, false)); + } catch (SkipException e) { + throw e; + } + + entityStore.addClassifications(GUID_TABLE_1, ImmutableList.of(classificationTypeT2.createDefaultValue())); + entityStore.addClassifications(GUID_TABLE_2, ImmutableList.of(classificationTypeT2.createDefaultValue())); + + source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_2, EXPORT_INCREMENTAL, nextTimestamp, false)); + verifyExpectedEntities(getFileNames(getZipSourceCopy(source)), GUID_TABLE_2); + + source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_2, EXPORT_INCREMENTAL, nextTimestamp, false)); + verifyUnExpectedEntities(getFileNames(getZipSourceCopy(source)), GUID_TABLE_1); + + } + + @Test(dependsOnMethods = "importHiveDb") + public void exportTableIncremental() throws AtlasBaseException, IOException { + InputStream source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, 0, true)); + + ZipSource sourceCopy = getZipSourceCopy(source); + verifyExpectedEntities(getFileNames(sourceCopy), GUID_DB, GUID_TABLE_CTAS_2); + + nextTimestamp = updateTimesampForNextIncrementalExport(sourceCopy); + + entityStore.addClassifications(GUID_TABLE_1, ImmutableList.of(classificationTypeT3.createDefaultValue())); + entityStore.addClassifications(GUID_TABLE_CTAS_2, ImmutableList.of(classificationTypeT3.createDefaultValue())); + + source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, nextTimestamp, false)); + verifyExpectedEntities(getFileNames(getZipSourceCopy(source)), GUID_TABLE_CTAS_2); + + source = runExportWithParameters(exportService, getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL, nextTimestamp, false)); + verifyUnExpectedEntities(getFileNames(getZipSourceCopy(source)), GUID_TABLE_1); + } + private AtlasExportRequest getIncrementalRequest(long timestamp) { try { @@ -292,6 +355,12 @@ public class ExportIncrementalTest extends AtlasTestBase { } } + private void verifyUnExpectedEntities(List<String> fileNames, String... guids){ + for (String guid : guids) { + assertFalse(fileNames.contains(guid.toLowerCase())); + } + } + private List<String> getFileNames(ZipSource zipSource){ List<String> ret = new ArrayList<>(); assertTrue(zipSource.hasNext()); diff --git a/repository/src/test/resources/json/stocksDB-Entities/typedef-new-classification-T3.json b/repository/src/test/resources/json/stocksDB-Entities/typedef-new-classification-T3.json new file mode 100644 index 000000000..534a0b39d --- /dev/null +++ b/repository/src/test/resources/json/stocksDB-Entities/typedef-new-classification-T3.json @@ -0,0 +1,23 @@ +{ + "classificationDefs": [ + { + "category": "CLASSIFICATION", + "guid": "6hee5e9f-e703-447a-b23b-0b831dc8a933", + "createdBy": "admin", + "updatedBy": "admin", + "createTime": 1711991058600, + "updateTime": 1711991058600, + "version": 1, + "name": "T3", + "description": "T3", + "typeVersion": "1.0", + "attributeDefs": [], + "superTypes": [], + "entityTypes": [], + "subTypes": [] + } + ], + "entityDefs": [], + "enumDefs": [], + "structDefs": [] +} diff --git a/repository/src/test/resources/json/stocksDB-Entities/typesdef-new-classification-T2.json b/repository/src/test/resources/json/stocksDB-Entities/typesdef-new-classification-T2.json new file mode 100644 index 000000000..967bec161 --- /dev/null +++ b/repository/src/test/resources/json/stocksDB-Entities/typesdef-new-classification-T2.json @@ -0,0 +1,23 @@ +{ + "classificationDefs": [ + { + "category": "CLASSIFICATION", + "guid": "2e135e9f-e703-447a-b23b-0b831dc8a933", + "createdBy": "admin", + "updatedBy": "admin", + "createTime": 1711991058474, + "updateTime": 1711991058474, + "version": 1, + "name": "T2", + "description": "T2", + "typeVersion": "1.0", + "attributeDefs": [], + "superTypes": [], + "entityTypes": [], + "subTypes": [] + } + ], + "entityDefs": [], + "enumDefs": [], + "structDefs": [] +}