This is an automated email from the ASF dual-hosted git repository. chaitalithombare pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new 54befb37f ATLAS-4974: impala-bridge, impala-bridge-shim, impala-hook-api modules: update for code readability improvements (#378) 54befb37f is described below commit 54befb37f569c69391fe01f532bb54422fb847f4 Author: chaitalicod <36201417+chaitali...@users.noreply.github.com> AuthorDate: Tue Jun 17 16:09:45 2025 +0530 ATLAS-4974: impala-bridge, impala-bridge-shim, impala-hook-api modules: update for code readability improvements (#378) Co-authored-by: chaitalithombare <chaitalithomb...@apache.org> --- .../org/apache/atlas/impala/ImpalaLineageTool.java | 277 +++++++++++---------- .../atlas/impala/hook/AtlasImpalaHookContext.java | 39 +-- .../atlas/impala/hook/ImpalaIdentifierParser.java | 108 ++++---- .../atlas/impala/hook/ImpalaLineageHook.java | 37 ++- .../atlas/impala/hook/ImpalaOperationParser.java | 11 +- .../atlas/impala/hook/events/BaseImpalaEvent.java | 76 +++--- .../impala/hook/events/CreateImpalaProcess.java | 46 ++-- 7 files changed, 296 insertions(+), 298 deletions(-) diff --git a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/ImpalaLineageTool.java b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/ImpalaLineageTool.java index 6e6d6f1ee..d10a7bf07 100644 --- a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/ImpalaLineageTool.java +++ b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/ImpalaLineageTool.java @@ -19,13 +19,8 @@ package org.apache.atlas.impala; import org.apache.atlas.impala.hook.ImpalaLineageHook; - -import java.io.*; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.io.FileUtils; @@ -35,141 +30,148 @@ import org.apache.commons.io.filefilter.PrefixFileFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileFilter; +import java.io.FileWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + /** * Entry point of actual implementation of Impala lineage tool. It reads the lineage records in * lineage log. It then calls instance of ImpalaLineageHook to convert lineage records to * lineage notifications and send them to Atlas. */ public class ImpalaLineageTool { - private static final Logger LOG = LoggerFactory.getLogger(ImpalaLineageTool.class); - private static final String WAL_FILE_EXTENSION = ".wal"; - private static final String WAL_FILE_PREFIX = "WAL"; - private String directoryName; - private String prefix; - - public ImpalaLineageTool(String[] args) { - try { - Options options = new Options(); - options.addOption("d", "directory", true, "the lineage files' folder"); - options.addOption("p", "prefix", true, "the prefix of the lineage files"); - - CommandLine cmd = new DefaultParser().parse(options, args); - directoryName = cmd.getOptionValue("d"); - prefix = cmd.getOptionValue("p"); - } catch(ParseException e) { - LOG.warn("Failed to parse command arguments. Error: ", e.getMessage()); - printUsage(); - - throw new RuntimeException(e); + private static final Logger LOG = LoggerFactory.getLogger(ImpalaLineageTool.class); + private static final String WAL_FILE_EXTENSION = ".wal"; + private static final String WAL_FILE_PREFIX = "WAL"; + private String directoryName; + private String prefix; + + public ImpalaLineageTool(String[] args) { + try { + Options options = new Options(); + options.addOption("d", "directory", true, "the lineage files' folder"); + options.addOption("p", "prefix", true, "the prefix of the lineage files"); + + CommandLine cmd = new DefaultParser().parse(options, args); + directoryName = cmd.getOptionValue("d"); + prefix = cmd.getOptionValue("p"); + } catch (ParseException e) { + LOG.warn("Failed to parse command arguments. Error: ", e.getMessage()); + printUsage(); + + throw new RuntimeException(e); + } } - } - - public void run() { - ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook(); - File[] currentFiles = getCurrentFiles(); - int fileNum = currentFiles.length; + public void run() { + ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook(); - for(int i = 0; i < fileNum; i++) { - String filename = currentFiles[i].getAbsolutePath(); - String walFilename = directoryName + WAL_FILE_PREFIX + currentFiles[i].getName() + WAL_FILE_EXTENSION; + File[] currentFiles = getCurrentFiles(); + int fileNum = currentFiles.length; - LOG.info("Importing: {}", filename); - importHImpalaEntities(impalaLineageHook, filename, walFilename); + for (int i = 0; i < fileNum; i++) { + String filename = currentFiles[i].getAbsolutePath(); + String walFilename = directoryName + WAL_FILE_PREFIX + currentFiles[i].getName() + WAL_FILE_EXTENSION; + LOG.info("Importing: {}", filename); + importHImpalaEntities(impalaLineageHook, filename, walFilename); - if(i != fileNum - 1) { - deleteLineageAndWal(currentFiles[i], walFilename); - } - } - LOG.info("Impala bridge processing: Done! "); - } - - public static void main(String[] args) { - if (args != null && args.length != 4) { - // The lineage file location and prefix should be input as the parameters - System.out.println("Impala bridge: wrong number of arguments. Please try again"); - printUsage(); - return; + if (i != fileNum - 1) { + deleteLineageAndWal(currentFiles[i], walFilename); + } + } + LOG.info("Impala bridge processing: Done! "); } - ImpalaLineageTool instance = new ImpalaLineageTool(args); - instance.run(); - } + public static void main(String[] args) { + if (args != null && args.length != 4) { + // The lineage file location and prefix should be input as the parameters + System.out.println("Impala bridge: wrong number of arguments. Please try again"); + printUsage(); + return; + } + + ImpalaLineageTool instance = new ImpalaLineageTool(args); + instance.run(); + } /** * Delete the used lineage file and wal file * @param currentFile The current file * @param wal The wal file */ - public static void deleteLineageAndWal(File currentFile, String wal) { - if(currentFile.exists() && currentFile.delete()) { - LOG.info("Lineage file {} is deleted successfully", currentFile.getPath()); - } else { - LOG.info("Failed to delete the lineage file {}", currentFile.getPath()); + public static void deleteLineageAndWal(File currentFile, String wal) { + if (currentFile.exists() && currentFile.delete()) { + LOG.info("Lineage file {} is deleted successfully", currentFile.getPath()); + } else { + LOG.info("Failed to delete the lineage file {}", currentFile.getPath()); + } + + File file = new File(wal); + + if (file.exists() && file.delete()) { + LOG.info("Wal file {} deleted successfully", wal); + } else { + LOG.info("Failed to delete the wal file {}", wal); + } } - File file = new File(wal); - - if(file.exists() && file.delete()) { - LOG.info("Wal file {} deleted successfully", wal); - } else { - LOG.info("Failed to delete the wal file {}", wal); + private static void printUsage() { + System.out.println(); + System.out.println(); + System.out.println("Usage: import-impala.sh [-d <directory>] [-p <prefix>]"); + System.out.println("Imports specified lineage files by given directory and file prefix."); + System.out.println(); } - } - - private static void printUsage() { - System.out.println(); - System.out.println(); - System.out.println("Usage: import-impala.sh [-d <directory>] [-p <prefix>]" ); - System.out.println(" Imports specified lineage files by given directory and file prefix."); - System.out.println(); - } /** * This function figures out the right lineage file path+name to process sorted by the last * time they are modified. (old -> new) * @return get the lineage files from given directory with given prefix. */ - public File[] getCurrentFiles() { - try { - LOG.info("Scanning: " + directoryName); - File folder = new File(directoryName); - File[] listOfFiles = folder.listFiles((FileFilter) new PrefixFileFilter(prefix, IOCase.SENSITIVE)); - - if ((listOfFiles == null) || (listOfFiles.length == 0)) { - LOG.info("Found no lineage files."); + public File[] getCurrentFiles() { + try { + LOG.info("Scanning: " + directoryName); + File folder = new File(directoryName); + File[] listOfFiles = folder.listFiles((FileFilter) new PrefixFileFilter(prefix, IOCase.SENSITIVE)); + + if ((listOfFiles == null) || (listOfFiles.length == 0)) { + LOG.info("Found no lineage files."); + return new File[0]; + } + + if (listOfFiles.length > 1) { + Arrays.sort(listOfFiles, LastModifiedFileComparator.LASTMODIFIED_COMPARATOR); + } + + LOG.info("Found {} lineage files" + listOfFiles.length); + return listOfFiles; + } catch (Exception e) { + LOG.error("Import lineage file failed.", e); + } return new File[0]; - } + } - if(listOfFiles.length > 1) { - Arrays.sort(listOfFiles, LastModifiedFileComparator.LASTMODIFIED_COMPARATOR); - } + private boolean processImpalaLineageHook(ImpalaLineageHook impalaLineageHook, List<String> lineageList) { + boolean allSucceed = true; - LOG.info("Found {} lineage files" + listOfFiles.length); - return listOfFiles; - } catch(Exception e) { - LOG.error("Import lineage file failed.", e); - } - return new File[0]; - } - - private boolean processImpalaLineageHook(ImpalaLineageHook impalaLineageHook, List<String> lineageList) { - boolean allSucceed = true; - - // returns true if successfully sent to Atlas - for (String lineageRecord : lineageList) { - try { - impalaLineageHook.process(lineageRecord); - } catch (Exception ex) { - String errorMessage = String.format("Exception at query {} \n", lineageRecord); - LOG.error(errorMessage, ex); - - allSucceed = false; - } - } + // returns true if successfully sent to Atlas + for (String lineageRecord : lineageList) { + try { + impalaLineageHook.process(lineageRecord); + } catch (Exception ex) { + String errorMessage = String.format("Exception at query {} \n", lineageRecord); + LOG.error(errorMessage, ex); - return allSucceed; - } + allSucceed = false; + } + } + + return allSucceed; + } /** * Create a list of lineage queries based on the lineage file and the wal file @@ -177,40 +179,39 @@ public class ImpalaLineageTool { * @param walfile * @return */ - public void importHImpalaEntities(ImpalaLineageHook impalaLineageHook, String name, String walfile) { - List<String> lineageList = new ArrayList<>(); + public void importHImpalaEntities(ImpalaLineageHook impalaLineageHook, String name, String walfile) { + List<String> lineageList = new ArrayList<>(); - try { - File lineageFile = new File(name); //use current file length to minus the offset - File walFile = new File(walfile); + try { + File lineageFile = new File(name); //use current file length to minus the offset + File walFile = new File(walfile); // if the wal file does not exist, create one with 0 byte read, else, read the number - if(!walFile.exists()) { - BufferedWriter writer = new BufferedWriter(new FileWriter(walfile)); - writer.write("0, " + name); - writer.close(); - } + if (!walFile.exists()) { + BufferedWriter writer = new BufferedWriter(new FileWriter(walfile)); + writer.write("0, " + name); + writer.close(); + } - LOG.debug("Reading: " + name); - String lineageRecord = FileUtils.readFileToString(lineageFile, "UTF-8"); + LOG.debug("Reading: " + name); + String lineageRecord = FileUtils.readFileToString(lineageFile, "UTF-8"); - lineageList.add(lineageRecord); + lineageList.add(lineageRecord); // call instance of ImpalaLineageHook to process the list of Impala lineage record - if(processImpalaLineageHook(impalaLineageHook, lineageList)) { + if (processImpalaLineageHook(impalaLineageHook, lineageList)) { // write how many bytes the current file is to the wal file - FileWriter newWalFile = new FileWriter(walfile, true); - BufferedWriter newWalFileBuf = new BufferedWriter(newWalFile); - newWalFileBuf.newLine(); - newWalFileBuf.write(String.valueOf(lineageFile.length()) + "," + name); - - newWalFileBuf.close(); - newWalFile.close(); - } else { - LOG.error("Error sending some of impala lineage records to ImpalaHook"); - } - } catch (Exception e) { - LOG.error("Error in processing lineage records. Exception: " + e.getMessage()); + FileWriter newWalFile = new FileWriter(walfile, true); + BufferedWriter newWalFileBuf = new BufferedWriter(newWalFile); + newWalFileBuf.newLine(); + newWalFileBuf.write(String.valueOf(lineageFile.length()) + "," + name); + + newWalFileBuf.close(); + newWalFile.close(); + } else { + LOG.error("Error sending some of impala lineage records to ImpalaHook"); + } + } catch (Exception e) { + LOG.error("Error in processing lineage records. Exception: " + e.getMessage()); + } } - } - -} \ No newline at end of file +} diff --git a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java index 51b2f832e..e29f814ad 100644 --- a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java +++ b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java @@ -18,9 +18,6 @@ package org.apache.atlas.impala.hook; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; import org.apache.atlas.impala.model.ImpalaOperationType; import org.apache.atlas.impala.model.ImpalaQuery; import org.apache.atlas.impala.model.LineageVertex; @@ -28,10 +25,13 @@ import org.apache.atlas.impala.model.LineageVertexMetadata; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.commons.lang.StringUtils; - +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; /** * Contain the info related to an linear record from Impala */ + public class AtlasImpalaHookContext { public static final char QNAME_SEP_METADATA_NAMESPACE = '@'; public static final char QNAME_SEP_ENTITY_NAME = '.'; @@ -43,17 +43,19 @@ public class AtlasImpalaHookContext { private final Map<String, AtlasEntity> qNameEntityMap = new HashMap<>(); public AtlasImpalaHookContext(ImpalaLineageHook hook, ImpalaOperationType operationType, - ImpalaQuery lineageQuery) throws Exception { + ImpalaQuery lineageQuery) throws Exception { this.hook = hook; this.impalaOperation = operationType; this.lineageQuery = lineageQuery; - } public ImpalaQuery getLineageQuery() { return lineageQuery; } - public String getQueryStr() { return lineageQuery.getQueryText(); } + + public String getQueryStr() { + return lineageQuery.getQueryText(); + } public ImpalaOperationType getImpalaOperationType() { return impalaOperation; @@ -67,7 +69,9 @@ public class AtlasImpalaHookContext { return qNameEntityMap.get(qualifiedName); } - public Collection<AtlasEntity> getEntities() { return qNameEntityMap.values(); } + public Collection<AtlasEntity> getEntities() { + return qNameEntityMap.values(); + } public String getMetadataNamespace() { return hook.getMetadataNamespace(); @@ -96,7 +100,7 @@ public class AtlasImpalaHookContext { throw new IllegalArgumentException(fullTableName + " does not contain database name"); } - return getQualifiedNameForTable(fullTableName.substring(0, sepPos), fullTableName.substring(sepPos+1)); + return getQualifiedNameForTable(fullTableName.substring(0, sepPos), fullTableName.substring(sepPos + 1)); } public String getQualifiedNameForTable(String dbName, String tableName) { @@ -131,12 +135,12 @@ public class AtlasImpalaHookContext { int sepPosLast = columnName.lastIndexOf(QNAME_SEP_ENTITY_NAME); if (isSeparatorIndexValid(sepPosLast)) { - columnName = columnName.substring(sepPosLast+1); + columnName = columnName.substring(sepPosLast + 1); } return getQualifiedNameForColumn( fullTableName.substring(0, sepPos), - fullTableName.substring(sepPos+1), + fullTableName.substring(sepPos + 1), columnName); } @@ -149,7 +153,7 @@ public class AtlasImpalaHookContext { int sepPosLast = fullColumnName.lastIndexOf(QNAME_SEP_ENTITY_NAME); if (!isSeparatorIndexValid(sepPosFirst) || !isSeparatorIndexValid(sepPosLast) || - sepPosFirst == sepPosLast) { + sepPosFirst == sepPosLast) { throw new IllegalArgumentException( String.format("fullColumnName {} does not contain database name or table name", fullColumnName)); @@ -157,8 +161,8 @@ public class AtlasImpalaHookContext { return getQualifiedNameForColumn( fullColumnName.substring(0, sepPosFirst), - fullColumnName.substring(sepPosFirst+1, sepPosLast), - fullColumnName.substring(sepPosLast+1)); + fullColumnName.substring(sepPosFirst + 1, sepPosLast), + fullColumnName.substring(sepPosLast + 1)); } public String getColumnNameOnly(String fullColumnName) throws IllegalArgumentException { @@ -172,7 +176,7 @@ public class AtlasImpalaHookContext { return fullColumnName; } - return fullColumnName.substring(sepPosLast+1); + return fullColumnName.substring(sepPosLast + 1); } public String getQualifiedNameForColumn(String dbName, String tableName, String columnName) { @@ -181,7 +185,9 @@ public class AtlasImpalaHookContext { columnName + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() + getMetadataNamespace(); } - public String getUserName() { return lineageQuery.getUser(); } + public String getUserName() { + return lineageQuery.getUser(); + } public String getDatabaseNameFromTable(String fullTableName) { int sepPos = fullTableName.lastIndexOf(QNAME_SEP_ENTITY_NAME); @@ -209,5 +215,4 @@ public class AtlasImpalaHookContext { public boolean isSeparatorIndexValid(int index) { return index > 0; } - } diff --git a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaIdentifierParser.java b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaIdentifierParser.java index 33e44f729..9be87d62b 100644 --- a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaIdentifierParser.java +++ b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaIdentifierParser.java @@ -18,11 +18,11 @@ package org.apache.atlas.impala.hook; +import org.apache.commons.lang.StringUtils; + import java.util.Arrays; import java.util.HashSet; - import java.util.Set; -import org.apache.commons.lang.StringUtils; /** * Check if a string is a valid Impala table identifier. @@ -35,6 +35,10 @@ public class ImpalaIdentifierParser { // add "." to allow <dbName>.<tableName> public static final String VALID_IMPALA_IDENTIFIER_REGEX = "^[a-zA-Z][a-zA-Z0-9_.]{0,127}$"; + private ImpalaIdentifierParser() { + throw new UnsupportedOperationException("ImpalaIdentifierParser"); + } + public static boolean isTableNameValid(String inTableName) { if (StringUtils.isEmpty(inTableName)) { return false; @@ -74,7 +78,6 @@ public class ImpalaIdentifierParser { // keywords. static Set<String> reservedWords; - public static void init() { // initilize keywords keywordMap = new HashSet<>(); @@ -312,7 +315,6 @@ public class ImpalaIdentifierParser { tokenIdMap.add("COMMENTED_PLAN_HINT_END"); tokenIdMap.add("Unexpected character"); - // For impala 3.0, reserved words = keywords + sql16ReservedWords - builtinFunctions // - whitelist // unused reserved words = reserved words - keywords. These words are reserved for @@ -320,50 +322,50 @@ public class ImpalaIdentifierParser { reservedWords = new HashSet<>(keywordMap); // Add SQL:2016 reserved words reservedWords.addAll(Arrays.asList(new String[] { - "abs", "acos", "allocate", "any", "are", "array_agg", "array_max_cardinality", - "asensitive", "asin", "asymmetric", "at", "atan", "atomic", "avg", "begin", - "begin_frame", "begin_partition", "blob", "both", "call", "called", "cardinality", - "cascaded", "ceil", "ceiling", "char_length", "character", "character_length", - "check", "classifier", "clob", "close", "coalesce", "collate", "collect", - "commit", "condition", "connect", "constraint", "contains", "convert", "copy", - "corr", "corresponding", "cos", "cosh", "count", "covar_pop", "covar_samp", - "cube", "cume_dist", "current_catalog", "current_date", - "current_default_transform_group", "current_path", "current_path", "current_role", - "current_role", "current_row", "current_schema", "current_time", - "current_timestamp", "current_transform_group_for_type", "current_user", "cursor", - "cycle", "day", "deallocate", "dec", "decfloat", "declare", "define", - "dense_rank", "deref", "deterministic", "disconnect", "dynamic", "each", - "element", "empty", "end-exec", "end_frame", "end_partition", "equals", "escape", - "every", "except", "exec", "execute", "exp", "extract", "fetch", "filter", - "first_value", "floor", "foreign", "frame_row", "free", "fusion", "get", "global", - "grouping", "groups", "hold", "hour", "identity", "indicator", "initial", "inout", - "insensitive", "integer", "intersect", "intersection", "json_array", - "json_arrayagg", "json_exists", "json_object", "json_objectagg", "json_query", - "json_table", "json_table_primitive", "json_value", "lag", "language", "large", - "last_value", "lateral", "lead", "leading", "like_regex", "listagg", "ln", - "local", "localtime", "localtimestamp", "log", "log10 ", "lower", "match", - "match_number", "match_recognize", "matches", "max", "member", "merge", "method", - "min", "minute", "mod", "modifies", "module", "month", "multiset", "national", - "natural", "nchar", "nclob", "new", "no", "none", "normalize", "nth_value", - "ntile", "nullif", "numeric", "occurrences_regex", "octet_length", "of", "old", - "omit", "one", "only", "open", "out", "overlaps", "overlay", "parameter", - "pattern", "per", "percent", "percent_rank", "percentile_cont", "percentile_disc", - "period", "portion", "position", "position_regex", "power", "precedes", - "precision", "prepare", "procedure", "ptf", "rank", "reads", "real", "recursive", - "ref", "references", "referencing", "regr_avgx", "regr_avgy", "regr_count", - "regr_intercept", "regr_r2", "regr_slope", "regr_sxx", "regr_sxy", "regr_syy", - "release", "result", "return", "rollback", "rollup", "row_number", "running", - "savepoint", "scope", "scroll", "search", "second", "seek", "sensitive", - "session_user", "similar", "sin", "sinh", "skip", "some", "specific", - "specifictype", "sql", "sqlexception", "sqlstate", "sqlwarning", "sqrt", "start", - "static", "stddev_pop", "stddev_samp", "submultiset", "subset", "substring", - "substring_regex", "succeeds", "sum", "symmetric", "system", "system_time", - "system_user", "tan", "tanh", "time", "timezone_hour", "timezone_minute", - "trailing", "translate", "translate_regex", "translation", "treat", "trigger", - "trim", "trim_array", "uescape", "unique", "unknown", "unnest", "update ", - "upper", "user", "value", "value_of", "var_pop", "var_samp", "varbinary", - "varying", "versioning", "whenever", "width_bucket", "window", "within", - "without", "year"})); + "abs", "acos", "allocate", "any", "are", "array_agg", "array_max_cardinality", + "asensitive", "asin", "asymmetric", "at", "atan", "atomic", "avg", "begin", + "begin_frame", "begin_partition", "blob", "both", "call", "called", "cardinality", + "cascaded", "ceil", "ceiling", "char_length", "character", "character_length", + "check", "classifier", "clob", "close", "coalesce", "collate", "collect", + "commit", "condition", "connect", "constraint", "contains", "convert", "copy", + "corr", "corresponding", "cos", "cosh", "count", "covar_pop", "covar_samp", + "cube", "cume_dist", "current_catalog", "current_date", + "current_default_transform_group", "current_path", "current_path", "current_role", + "current_role", "current_row", "current_schema", "current_time", + "current_timestamp", "current_transform_group_for_type", "current_user", "cursor", + "cycle", "day", "deallocate", "dec", "decfloat", "declare", "define", + "dense_rank", "deref", "deterministic", "disconnect", "dynamic", "each", + "element", "empty", "end-exec", "end_frame", "end_partition", "equals", "escape", + "every", "except", "exec", "execute", "exp", "extract", "fetch", "filter", + "first_value", "floor", "foreign", "frame_row", "free", "fusion", "get", "global", + "grouping", "groups", "hold", "hour", "identity", "indicator", "initial", "inout", + "insensitive", "integer", "intersect", "intersection", "json_array", + "json_arrayagg", "json_exists", "json_object", "json_objectagg", "json_query", + "json_table", "json_table_primitive", "json_value", "lag", "language", "large", + "last_value", "lateral", "lead", "leading", "like_regex", "listagg", "ln", + "local", "localtime", "localtimestamp", "log", "log10 ", "lower", "match", + "match_number", "match_recognize", "matches", "max", "member", "merge", "method", + "min", "minute", "mod", "modifies", "module", "month", "multiset", "national", + "natural", "nchar", "nclob", "new", "no", "none", "normalize", "nth_value", + "ntile", "nullif", "numeric", "occurrences_regex", "octet_length", "of", "old", + "omit", "one", "only", "open", "out", "overlaps", "overlay", "parameter", + "pattern", "per", "percent", "percent_rank", "percentile_cont", "percentile_disc", + "period", "portion", "position", "position_regex", "power", "precedes", + "precision", "prepare", "procedure", "ptf", "rank", "reads", "real", "recursive", + "ref", "references", "referencing", "regr_avgx", "regr_avgy", "regr_count", + "regr_intercept", "regr_r2", "regr_slope", "regr_sxx", "regr_sxy", "regr_syy", + "release", "result", "return", "rollback", "rollup", "row_number", "running", + "savepoint", "scope", "scroll", "search", "second", "seek", "sensitive", + "session_user", "similar", "sin", "sinh", "skip", "some", "specific", + "specifictype", "sql", "sqlexception", "sqlstate", "sqlwarning", "sqrt", "start", + "static", "stddev_pop", "stddev_samp", "submultiset", "subset", "substring", + "substring_regex", "succeeds", "sum", "symmetric", "system", "system_time", + "system_user", "tan", "tanh", "time", "timezone_hour", "timezone_minute", + "trailing", "translate", "translate_regex", "translation", "treat", "trigger", + "trim", "trim_array", "uescape", "unique", "unknown", "unnest", "update ", + "upper", "user", "value", "value_of", "var_pop", "var_samp", "varbinary", + "varying", "versioning", "whenever", "width_bucket", "window", "within", + "without", "year"})); // TODO: Remove impala builtin function names. Need to find content of // BuiltinsDb.getInstance().getAllFunctions() //reservedWords.removeAll(BuiltinsDb.getInstance().getAllFunctions().keySet()); @@ -371,11 +373,11 @@ public class ImpalaIdentifierParser { // Remove whitelist words. These words might be heavily used in production, and // impala is unlikely to implement SQL features around these words in the near future. reservedWords.removeAll(Arrays.asList(new String[] { - // time units - "year", "month", "day", "hour", "minute", "second", - "begin", "call", "check", "classifier", "close", "identity", "language", - "localtime", "member", "module", "new", "nullif", "old", "open", "parameter", - "period", "result", "return", "sql", "start", "system", "time", "user", "value" + // time units + "year", "month", "day", "hour", "minute", "second", + "begin", "call", "check", "classifier", "close", "identity", "language", + "localtime", "member", "module", "new", "nullif", "old", "open", "parameter", + "period", "result", "return", "sql", "start", "system", "time", "user", "value" })); } diff --git a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java index 023e2bb4a..5798aa5ad 100644 --- a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java +++ b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java @@ -18,10 +18,7 @@ package org.apache.atlas.impala.hook; -import java.net.InetAddress; -import java.net.UnknownHostException; import com.google.common.collect.Sets; -import java.io.IOException; import org.apache.atlas.hook.AtlasHook; import org.apache.atlas.impala.hook.events.BaseImpalaEvent; import org.apache.atlas.impala.hook.events.CreateImpalaProcess; @@ -32,8 +29,13 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import javax.security.auth.Subject; import javax.security.auth.kerberos.KerberosPrincipal; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.util.HashSet; import static org.apache.atlas.repository.Constants.IMPALA_SOURCE; @@ -63,9 +65,7 @@ public class ImpalaLineageHook extends AtlasHook { } } - public ImpalaLineageHook() { - - } + public ImpalaLineageHook() {} public String getMessageSource() { return IMPALA_SOURCE; @@ -99,17 +99,17 @@ public class ImpalaLineageHook extends AtlasHook { try { ImpalaOperationType operationType = ImpalaOperationParser.getImpalaOperationType(lineageQuery.getQueryText()); AtlasImpalaHookContext context = - new AtlasImpalaHookContext(this, operationType, lineageQuery); + new AtlasImpalaHookContext(this, operationType, lineageQuery); BaseImpalaEvent event = null; switch (operationType) { - case CREATEVIEW: - case CREATETABLE_AS_SELECT: - case ALTERVIEW_AS: - case QUERY: - case QUERY_WITH_CLAUSE: - event = new CreateImpalaProcess(context); - break; + case CREATEVIEW: + case CREATETABLE_AS_SELECT: + case ALTERVIEW_AS: + case QUERY: + case QUERY_WITH_CLAUSE: + event = new CreateImpalaProcess(context); + break; default: if (LOG.isDebugEnabled()) { LOG.debug("HiveHook.run({}): operation ignored", lineageQuery.getQueryText()); @@ -125,9 +125,8 @@ public class ImpalaLineageHook extends AtlasHook { super.notifyEntities(event.getNotificationMessages(), ugi); } } catch (Throwable t) { - LOG.error("ImpalaLineageHook.process(): failed to process query {}", - AtlasType.toJson(lineageQuery), t); + AtlasType.toJson(lineageQuery), t); } if (LOG.isDebugEnabled()) { @@ -140,9 +139,9 @@ public class ImpalaLineageHook extends AtlasHook { } private UserGroupInformation getUgiFromUserName(String userName) throws IOException { - String userPrincipal = userName.contains(REALM_SEPARATOR)? userName : userName + "@" + getRealm(); + String userPrincipal = userName.contains(REALM_SEPARATOR) ? userName : userName + "@" + getRealm(); Subject userSubject = new Subject(false, Sets.newHashSet( - new KerberosPrincipal(userPrincipal)), new HashSet<Object>(),new HashSet<Object>()); + new KerberosPrincipal(userPrincipal)), new HashSet<Object>(), new HashSet<Object>()); return UserGroupInformation.getUGIFromSubject(userSubject); } @@ -153,4 +152,4 @@ public class ImpalaLineageHook extends AtlasHook { public boolean isConvertHdfsPathToLowerCase() { return convertHdfsPathToLowerCase; } -} \ No newline at end of file +} diff --git a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaOperationParser.java b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaOperationParser.java index fbc57b698..1fece6de9 100644 --- a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaOperationParser.java +++ b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaOperationParser.java @@ -20,13 +20,13 @@ package org.apache.atlas.impala.hook; import org.apache.atlas.impala.model.ImpalaOperationType; import org.apache.commons.lang.StringUtils; + import java.util.regex.Pattern; /** * Parse an Impala query text and output the impala operation type */ public class ImpalaOperationParser { - private static final Pattern COMMENT_PATTERN = Pattern.compile("/\\*.*?\\*/", Pattern.DOTALL); private static final Pattern CREATE_VIEW_PATTERN = @@ -43,9 +43,8 @@ public class ImpalaOperationParser { private static final Pattern WITH_CLAUSE_INSERT_SELECT_FROM_PATTERN = Pattern.compile("^[ ]*(\\bwith\\b.*)?\\s*\\binsert\\b.*\\b(into|overwrite)\\b.*\\bselect\\b.*\\bfrom\\b.*", Pattern.DOTALL | Pattern.CASE_INSENSITIVE); - - public ImpalaOperationParser() { - } + + private ImpalaOperationParser() {} public static ImpalaOperationType getImpalaOperationType(String queryText) { // Impala does no generate lineage record for command "LOAD DATA IN PATH" @@ -60,7 +59,6 @@ public class ImpalaOperationParser { return ImpalaOperationType.QUERY; } else if (doesMatch(queryTextWithNoComments, WITH_CLAUSE_INSERT_SELECT_FROM_PATTERN)) { return ImpalaOperationType.QUERY_WITH_CLAUSE; - } return ImpalaOperationType.UNKNOWN; @@ -81,5 +79,4 @@ public class ImpalaOperationParser { private static boolean doesMatch(final String queryText, final Pattern pattern) { return pattern.matcher(queryText).matches(); } - -} \ No newline at end of file +} diff --git a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java index 82e126abe..d6c1c8575 100644 --- a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java +++ b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java @@ -18,18 +18,6 @@ package org.apache.atlas.impala.hook.events; -import static org.apache.atlas.impala.hook.AtlasImpalaHookContext.QNAME_SEP_PROCESS; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - import org.apache.atlas.impala.hook.AtlasImpalaHookContext; import org.apache.atlas.impala.hook.ImpalaOperationParser; import org.apache.atlas.impala.model.ImpalaDataType; @@ -45,12 +33,23 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.type.AtlasTypeUtil; - import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.atlas.impala.hook.AtlasImpalaHookContext.QNAME_SEP_PROCESS; + /** * The base class for generating notification event to Atlas server * Most code is copied from BaseHiveEvent to avoid depending on org.apache.atlas.hive.hook @@ -99,7 +98,6 @@ public abstract class BaseImpalaEvent { protected final Map<Long, LineageVertex> verticesMap; public BaseImpalaEvent(AtlasImpalaHookContext context) { - this.context = context; vertexNameMap = new HashMap<>(); verticesMap = new HashMap<>(); @@ -111,7 +109,9 @@ public abstract class BaseImpalaEvent { public abstract List<HookNotification> getNotificationMessages() throws Exception; - public String getUserName() { return context.getUserName(); } + public String getUserName() { + return context.getUserName(); + } public String getTableNameFromVertex(LineageVertex vertex) { if (vertex.getVertexType() == ImpalaVertexType.COLUMN) { @@ -130,7 +130,6 @@ public abstract class BaseImpalaEvent { } public String getQualifiedName(ImpalaNode node) throws IllegalArgumentException { - return getQualifiedName(node.getOwnVertex()); } @@ -172,8 +171,8 @@ public abstract class BaseImpalaEvent { static final class AtlasEntityComparator implements Comparator<AtlasEntity> { @Override public int compare(AtlasEntity entity1, AtlasEntity entity2) { - String name1 = (String)entity1.getAttribute(ATTRIBUTE_QUALIFIED_NAME); - String name2 = (String)entity2.getAttribute(ATTRIBUTE_QUALIFIED_NAME); + String name1 = (String) entity1.getAttribute(ATTRIBUTE_QUALIFIED_NAME); + String name2 = (String) entity2.getAttribute(ATTRIBUTE_QUALIFIED_NAME); if (name1 == null) { return -1; @@ -193,17 +192,17 @@ public abstract class BaseImpalaEvent { ImpalaOperationType operation = context.getImpalaOperationType(); if (operation == ImpalaOperationType.CREATEVIEW || - operation == ImpalaOperationType.CREATETABLE_AS_SELECT || - operation == ImpalaOperationType.ALTERVIEW_AS) { + operation == ImpalaOperationType.CREATETABLE_AS_SELECT || + operation == ImpalaOperationType.ALTERVIEW_AS) { List<? extends AtlasEntity> sortedEntities = new ArrayList<>(outputs); Collections.sort(sortedEntities, entityComparator); for (AtlasEntity entity : sortedEntities) { if (entity.getTypeName().equalsIgnoreCase(HIVE_TYPE_TABLE)) { - Long createTime = (Long)entity.getAttribute(ATTRIBUTE_CREATE_TIME); + Long createTime = (Long) entity.getAttribute(ATTRIBUTE_CREATE_TIME); - return (String)entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + QNAME_SEP_PROCESS + createTime; + return (String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + QNAME_SEP_PROCESS + createTime; } } } @@ -228,7 +227,6 @@ public abstract class BaseImpalaEvent { qualifiedName = sb.toString(); } - return qualifiedName; } @@ -249,10 +247,10 @@ public abstract class BaseImpalaEvent { String qualifiedName = null; long createTime = 0; - qualifiedName = (String)entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME); + qualifiedName = (String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME); if (entity.getTypeName().equalsIgnoreCase(HIVE_TYPE_TABLE)) { - Long createTimeObj = (Long)entity.getAttribute(ATTRIBUTE_CREATE_TIME); + Long createTimeObj = (Long) entity.getAttribute(ATTRIBUTE_CREATE_TIME); if (createTimeObj != null) { createTime = createTimeObj; } @@ -266,17 +264,17 @@ public abstract class BaseImpalaEvent { boolean addWriteType = false; ImpalaOperationType subType = ImpalaOperationParser.getImpalaOperationSubType(operation, queryText); - switch (subType) { - // Impala does not generate lineage for UPDATE and DELETE - case INSERT: - case INSERT_OVERWRITE: - addWriteType = true; - break; - } - - if (addWriteType) { - processQualifiedName.append(QNAME_SEP_PROCESS).append(subType.name()); - } + switch (subType) { + // Impala does not generate lineage for UPDATE and DELETE + case INSERT: + case INSERT_OVERWRITE: + addWriteType = true; + break; + } + + if (addWriteType) { + processQualifiedName.append(QNAME_SEP_PROCESS).append(subType.name()); + } } processQualifiedName.append(QNAME_SEP_PROCESS).append(qualifiedName.toLowerCase().replaceAll("/", "")); @@ -296,7 +294,7 @@ public abstract class BaseImpalaEvent { case DFS_DIR: { ret = toAtlasEntity(node, entityExtInfo); } - break; + break; } return ret; @@ -436,10 +434,8 @@ public abstract class BaseImpalaEvent { } } } - ret.setAttribute(ATTRIBUTE_COLUMNS, getObjectIds(columns)); - context.putEntity(tblQualifiedName, ret); return ret; @@ -448,7 +444,7 @@ public abstract class BaseImpalaEvent { public static AtlasObjectId getObjectId(AtlasEntity entity) { String qualifiedName = (String) entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME); AtlasObjectId ret = new AtlasObjectId(entity.getGuid(), entity.getTypeName(), Collections - .singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName)); + .singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName)); return ret; } diff --git a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/CreateImpalaProcess.java b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/CreateImpalaProcess.java index 5e6ea5a55..ac2347011 100644 --- a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/CreateImpalaProcess.java +++ b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/CreateImpalaProcess.java @@ -18,20 +18,13 @@ package org.apache.atlas.impala.hook.events; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; import org.apache.atlas.impala.hook.AtlasImpalaHookContext; import org.apache.atlas.impala.model.ImpalaDataType; import org.apache.atlas.impala.model.ImpalaDependencyType; import org.apache.atlas.impala.model.ImpalaNode; +import org.apache.atlas.impala.model.ImpalaQuery; import org.apache.atlas.impala.model.ImpalaVertexType; import org.apache.atlas.impala.model.LineageEdge; -import org.apache.atlas.impala.model.ImpalaQuery; import org.apache.atlas.impala.model.LineageVertex; import org.apache.atlas.impala.model.LineageVertexMetadata; import org.apache.atlas.model.instance.AtlasEntity; @@ -42,6 +35,14 @@ import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + public class CreateImpalaProcess extends BaseImpalaEvent { private static final Logger LOG = LoggerFactory.getLogger(CreateImpalaProcess.class); @@ -117,10 +118,10 @@ public class CreateImpalaProcess extends BaseImpalaEvent { if (!inputs.isEmpty() || !outputs.isEmpty()) { AtlasEntity process = getImpalaProcessEntity(inputs, outputs); - if (process!= null) { + if (process != null) { if (LOG.isDebugEnabled()) { LOG.debug("get process entity with qualifiedName: {}", - process.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); + process.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); } ret.addEntity(process); @@ -129,7 +130,7 @@ public class CreateImpalaProcess extends BaseImpalaEvent { if (processExecution != null) { if (LOG.isDebugEnabled()) { LOG.debug("get process executition entity with qualifiedName: {}", - processExecution.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); + processExecution.getAttribute(ATTRIBUTE_QUALIFIED_NAME)); } ret.addEntity(processExecution); @@ -142,8 +143,6 @@ public class CreateImpalaProcess extends BaseImpalaEvent { } else { ret = null; } - - return ret; } @@ -158,7 +157,6 @@ public class CreateImpalaProcess extends BaseImpalaEvent { final Set<String> processedOutputCols = new HashSet<>(); for (LineageEdge edge : edges) { - if (!edge.getEdgeType().equals(ImpalaDependencyType.PROJECTION)) { // Impala dependency type can only be predicate or projection. // Impala predicate dependency: This is a dependency between a set of target @@ -176,7 +174,7 @@ public class CreateImpalaProcess extends BaseImpalaEvent { if (LOG.isDebugEnabled()) { LOG.debug("processColumnLineage(): target id = {}, target column name = {}", - targetId, outputColName); + targetId, outputColName); } if (outputColumn == null) { @@ -215,8 +213,8 @@ public class CreateImpalaProcess extends BaseImpalaEvent { AtlasEntity columnLineageProcess = new AtlasEntity(ImpalaDataType.IMPALA_COLUMN_LINEAGE.getName()); - String columnQualifiedName = (String)impalaProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + - AtlasImpalaHookContext.QNAME_SEP_PROCESS + outputColumns.get(0).getAttribute(ATTRIBUTE_NAME); + String columnQualifiedName = (String) impalaProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + + AtlasImpalaHookContext.QNAME_SEP_PROCESS + outputColumns.get(0).getAttribute(ATTRIBUTE_NAME); columnLineageProcess.setAttribute(ATTRIBUTE_NAME, columnQualifiedName); columnLineageProcess.setAttribute(ATTRIBUTE_QUALIFIED_NAME, columnQualifiedName); columnLineageProcess.setAttribute(ATTRIBUTE_INPUTS, getObjectIds(inputColumns)); @@ -240,7 +238,7 @@ public class CreateImpalaProcess extends BaseImpalaEvent { } for (AtlasEntity columnLineage : columnLineages) { - String columnQualifiedName = (String)columnLineage.getAttribute(ATTRIBUTE_QUALIFIED_NAME); + String columnQualifiedName = (String) columnLineage.getAttribute(ATTRIBUTE_QUALIFIED_NAME); if (LOG.isDebugEnabled()) { LOG.debug("get column lineage entity with qualifiedName: {}", columnQualifiedName); } @@ -253,9 +251,9 @@ public class CreateImpalaProcess extends BaseImpalaEvent { // Then organize the vertices into hierarchical structure: put all column vertices of a table // as children of a ImpalaNode representing that table. private void getInputOutList(ImpalaQuery lineageQuery, List<ImpalaNode> inputNodes, - List<ImpalaNode> outputNodes) { - // get vertex map with key being its id and - // ImpalaNode map with its own vertex's vertexId as its key + List<ImpalaNode> outputNodes) { + // get vertex map with key being its id and + // ImpalaNode map with its own vertex's vertexId as its key for (LineageVertex vertex : lineageQuery.getVertices()) { updateVertexMap(vertex); } @@ -308,7 +306,7 @@ public class CreateImpalaProcess extends BaseImpalaEvent { * @return the table name to ImpalaNode map, whose table node contains its columns */ private Map<String, ImpalaNode> buildInputOutputList(Set<Long> idSet, Map<Long, LineageVertex> vertexMap, - Map<String, ImpalaNode> vertexNameMap) { + Map<String, ImpalaNode> vertexNameMap) { Map<String, ImpalaNode> returnTableMap = new HashMap<>(); for (Long id : idSet) { @@ -323,7 +321,7 @@ public class CreateImpalaProcess extends BaseImpalaEvent { String tableName = getTableNameFromVertex(vertex); if (tableName == null) { LOG.warn("cannot find tableName for vertex with id: {}, column name : {}", - id, vertex.getVertexId() == null? "null" : vertex.getVertexId()); + id, vertex.getVertexId() == null ? "null" : vertex.getVertexId()); continue; } @@ -335,7 +333,7 @@ public class CreateImpalaProcess extends BaseImpalaEvent { if (tableNode == null) { LOG.warn("cannot find table node for vertex with id: {}, column name : {}", - id, vertex.getVertexId()); + id, vertex.getVertexId()); tableNode = createTableNode(tableName, getCreateTimeInVertex(null)); vertexNameMap.put(tableName, tableNode);