This is an automated email from the ASF dual-hosted git repository. pinal pushed a commit to branch atlas-2.5 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit f51085748dcef53aadca192051724d7d4925d24a Author: Vinayak Marraiya <72193307+vinayakmarraiya230...@users.noreply.github.com> AuthorDate: Tue Mar 25 07:04:16 2025 +0530 ATLAS-5001: Impala SQL queries that include the “WITH” clause should populate lineage in Atlas (#305) (cherry picked from commit be38132af1ae591638c0fbb35e01b4bd624d49c1) --- .../atlas/impala/hook/ImpalaLineageHook.java | 1 + .../atlas/impala/hook/ImpalaOperationParser.java | 6 ++ .../atlas/impala/hook/events/BaseImpalaEvent.java | 4 +- .../atlas/impala/model/ImpalaOperationType.java | 1 + .../atlas/impala/hook/ImpalaLineageHookTest.java | 65 ++++++++++++++++++++++ 5 files changed, 75 insertions(+), 2 deletions(-) diff --git a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java index 907f24478..023e2bb4a 100644 --- a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java +++ b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java @@ -107,6 +107,7 @@ public class ImpalaLineageHook extends AtlasHook { case CREATETABLE_AS_SELECT: case ALTERVIEW_AS: case QUERY: + case QUERY_WITH_CLAUSE: event = new CreateImpalaProcess(context); break; default: diff --git a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaOperationParser.java b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaOperationParser.java index 98f3eed1b..fbc57b698 100644 --- a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaOperationParser.java +++ b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaOperationParser.java @@ -41,6 +41,9 @@ public class ImpalaOperationParser { private static final Pattern INSERT_SELECT_FROM_PATTERN = Pattern.compile("^[ ]*\\binsert\\b.*\\b(into|overwrite)\\b.*\\bselect\\b.*\\bfrom\\b.*", Pattern.DOTALL | Pattern.CASE_INSENSITIVE); + private static final Pattern WITH_CLAUSE_INSERT_SELECT_FROM_PATTERN = + Pattern.compile("^[ ]*(\\bwith\\b.*)?\\s*\\binsert\\b.*\\b(into|overwrite)\\b.*\\bselect\\b.*\\bfrom\\b.*", Pattern.DOTALL | Pattern.CASE_INSENSITIVE); + public ImpalaOperationParser() { } @@ -55,6 +58,9 @@ public class ImpalaOperationParser { return ImpalaOperationType.ALTERVIEW_AS; } else if (doesMatch(queryTextWithNoComments, INSERT_SELECT_FROM_PATTERN)) { return ImpalaOperationType.QUERY; + } else if (doesMatch(queryTextWithNoComments, WITH_CLAUSE_INSERT_SELECT_FROM_PATTERN)) { + return ImpalaOperationType.QUERY_WITH_CLAUSE; + } return ImpalaOperationType.UNKNOWN; diff --git a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java index 3a13d0c41..82e126abe 100644 --- a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java +++ b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java @@ -208,8 +208,8 @@ public abstract class BaseImpalaEvent { } } - if (operation != ImpalaOperationType.QUERY) { - String errorMessage = String.format("Expect operation to be QUERY, but get unexpected operation type {}", operation.name()); + if (operation != ImpalaOperationType.QUERY && operation != ImpalaOperationType.QUERY_WITH_CLAUSE) { + String errorMessage = String.format("Expect operation to be QUERY or QUERY_WITH_CLAUSE, but get unexpected operation type {}", operation.name()); LOG.error(errorMessage); throw new IllegalArgumentException(errorMessage); } diff --git a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/model/ImpalaOperationType.java b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/model/ImpalaOperationType.java index a893b8845..5200b944e 100644 --- a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/model/ImpalaOperationType.java +++ b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/model/ImpalaOperationType.java @@ -23,6 +23,7 @@ public enum ImpalaOperationType{ CREATETABLE_AS_SELECT ("CREATETABLE_AS_SELECT"), ALTERVIEW_AS ("ALTERVIEW_AS"), QUERY ("QUERY"), + QUERY_WITH_CLAUSE ("QUERY_WITH_CLAUSE"), // sub operation type, which is associated with output INSERT ("INSERT"), diff --git a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookTest.java b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookTest.java new file mode 100644 index 000000000..5a1e1da0b --- /dev/null +++ b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaLineageHookTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.impala.hook; + +import org.apache.atlas.impala.model.ImpalaOperationType; +import org.apache.commons.lang.RandomStringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; + +public class ImpalaLineageHookTest { + private static final Logger LOG = LoggerFactory.getLogger(ImpalaLineageHookTest.class); + + @Test(dataProvider = "queryDataProvider") + public void testAllImpalaOperationTypes(String query, ImpalaOperationType expectedOperationType) { + try { + ImpalaOperationType operationType = ImpalaOperationParser.getImpalaOperationType(query); + assertEquals(operationType, expectedOperationType); + } catch (Exception e) { + fail("Query processing failed for query: " + query + " due to exception: " + e.getMessage()); + } + } + + @DataProvider(name = "queryDataProvider") + public Object[][] provideTestData() { + String table1 = "table_" + random(); + String table2 = "table_" + random(); + + return new Object[][] { + { "CREATE VIEW my_view AS SELECT id, name FROM " + table1, ImpalaOperationType.CREATEVIEW }, + { "CREATE TABLE " + table1 + " AS SELECT id, name FROM " + table1, ImpalaOperationType.CREATETABLE_AS_SELECT }, + { "ALTER VIEW my_view AS SELECT id, name FROM " + table1, ImpalaOperationType.ALTERVIEW_AS }, + { "INSERT INTO " + table1 + " SELECT id, name FROM " + table1, ImpalaOperationType.QUERY }, + { "WITH filtered_data AS (SELECT id, name, amount FROM " + table1 + " WHERE amount > 100) " + + "INSERT INTO " + table2 + " SELECT id, name, amount FROM filtered_data", ImpalaOperationType.QUERY_WITH_CLAUSE } + }; + } + + private String random() { + return RandomStringUtils.randomAlphanumeric(10); + } +}