This is an automated email from the ASF dual-hosted git repository. pinal pushed a commit to branch ATLAS-5021 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 69038f09da7d350164d555977eb7276dac53f684 Author: Pinal Shah <pinal.s...@freestoneinfotech.com> AuthorDate: Tue Apr 22 16:45:11 2025 +0700 ATLAS-5021: Extract Metadata from Trino periodically --- addons/trino-extractor/pom.xml | 93 +++++ .../src/main/bin/run-trino-extractor.sh | 138 +++++++ .../src/main/conf/atlas-application.properties | 32 ++ .../trino-extractor/src/main/conf/atlas-log4j.xml | 42 ++ .../apache/atlas/trino/cli/ExtractorContext.java | 106 +++++ .../apache/atlas/trino/cli/ExtractorService.java | 349 +++++++++++++++++ .../org/apache/atlas/trino/cli/TrinoExtractor.java | 182 +++++++++ .../atlas/trino/client/AtlasClientHelper.java | 426 +++++++++++++++++++++ .../atlas/trino/client/TrinoClientHelper.java | 132 +++++++ .../trino/connector/AtlasEntityConnector.java | 31 ++ .../atlas/trino/connector/ConnectorFactory.java | 37 ++ .../atlas/trino/connector/HiveEntityConnector.java | 144 +++++++ .../trino/connector/RdbmsEntityConnector.java | 43 +++ .../java/org/apache/atlas/trino/model/Catalog.java | 93 +++++ .../apache/atlas/trino/cli/TrinoExtractorIT.java | 42 ++ distro/pom.xml | 1 + .../src/main/assemblies/atlas-trino-extractor.xml | 65 ++++ pom.xml | 1 + 18 files changed, 1957 insertions(+) diff --git a/addons/trino-extractor/pom.xml b/addons/trino-extractor/pom.xml new file mode 100644 index 000000000..4fdf650bf --- /dev/null +++ b/addons/trino-extractor/pom.xml @@ -0,0 +1,93 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.atlas</groupId> + <artifactId>apache-atlas</artifactId> + <version>3.0.0-SNAPSHOT</version> + <relativePath>../../</relativePath> + </parent> + + <artifactId>atlas-trino-extractor</artifactId> + <packaging>jar</packaging> + + <name>Apache Atlas Trino Bridge</name> + <description>Apache Atlas Trino Bridge Module</description> + + <dependencies> + <dependency> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-client</artifactId> + <version>1.9</version> + </dependency> + + <dependency> + <groupId>io.trino</groupId> + <artifactId>trino-jdbc</artifactId> + <version>403</version> + <!-- Replace with the latest version --> + </dependency> + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-client-v2</artifactId> + </dependency> + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-intg</artifactId> + </dependency> + <dependency> + <groupId>org.quartz-scheduler</groupId> + <artifactId>quartz</artifactId> + <version>2.3.2</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <version>1.7.30</version> + </dependency> + + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>copy-dependencies</id> + <goals> + <goal>copy-dependencies</goal> + </goals> + <phase>package</phase> + <configuration> + <excludeScope>test</excludeScope> + <includeScope>compile</includeScope> + <outputDirectory>${project.build.directory}/dependency/trino</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> diff --git a/addons/trino-extractor/src/main/bin/run-trino-extractor.sh b/addons/trino-extractor/src/main/bin/run-trino-extractor.sh new file mode 100755 index 000000000..a7d4c8d71 --- /dev/null +++ b/addons/trino-extractor/src/main/bin/run-trino-extractor.sh @@ -0,0 +1,138 @@ +#!/bin/bash +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. See accompanying LICENSE file. +# +# resolve links - $0 may be a softlink +PRG="${0}" + +[[ `uname -s` == *"CYGWIN"* ]] && CYGWIN=true + +while [ -h "${PRG}" ]; do + ls=`ls -ld "${PRG}"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "${PRG}"`/"$link" + fi +done + +BASEDIR=`dirname ${PRG}` +BASEDIR=`cd ${BASEDIR}/..;pwd` + +if test -z "${JAVA_HOME}" +then + JAVA_BIN=`which java` + JAR_BIN=`which jar` +else + JAVA_BIN="${JAVA_HOME}/bin/java" + JAR_BIN="${JAVA_HOME}/bin/jar" +fi +export JAVA_BIN + +if [ ! -e "${JAVA_BIN}" ] || [ ! -e "${JAR_BIN}" ]; then + echo "$JAVA_BIN and/or $JAR_BIN not found on the system. Please make sure java and jar commands are available." + exit 1 +fi + +# Construct Atlas classpath using jars from hook/hive/atlas-hive-plugin-impl/ directory. +for i in "${BASEDIR}/lib/"*.jar; do + ATLASCPPATH="${ATLASCPPATH}:$i" +done + +if [ -z "${ATLAS_CONF_DIR}" ] && [ -e "${BASEDIR}/conf/" ];then + ATLAS_CONF_DIR="${BASEDIR}/conf/" +fi +ATLASCPPATH=${ATLASCPPATH}:${ATLAS_CONF_DIR} + +# log dir for applications +ATLAS_LOG_DIR="${BASEDIR}/log" +export ATLAS_LOG_DIR +LOGFILE="$ATLAS_LOG_DIR/atlas-trino-extractor.log" + +TIME=`date +%Y%m%d%H%M%s` + +CP="${ATLASCPPATH}" + +# If running in cygwin, convert pathnames and classpath to Windows format. +if [ "${CYGWIN}" == "true" ] +then + ATLAS_LOG_DIR=`cygpath -w ${ATLAS_LOG_DIR}` + LOGFILE=`cygpath -w ${LOGFILE}` + HIVE_CP=`cygpath -w ${HIVE_CP}` + HADOOP_CP=`cygpath -w ${HADOOP_CP}` + CP=`cygpath -w -p ${CP}` +fi + +JAVA_PROPERTIES="$ATLAS_OPTS -Datlas.log.dir=$ATLAS_LOG_DIR -Datlas.log.file=atlas-trino-extractor.log +-Dlog4j.configuration=atlas-log4j.xml -Djdk.httpclient.HttpClient.log=requests -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006" + +IMPORT_ARGS=() +JVM_ARGS= + +set -f +while true +do + option=${1} + shift + + case "${option}" in + -c) IMPORT_ARGS+=("-c" "$1"); shift;; + -s) IMPORT_ARGS+=("-s" "$1"); shift;; + -t) IMPORT_ARGS+=("-t" "$1"); shift;; + -cx) + CRON_EXPR="$1" + shift + while [[ "$1" != "" && "$1" != -* ]]; do + CRON_EXPR="$CRON_EXPR $1" + shift + done + IMPORT_ARGS+=("-cx" "$CRON_EXPR");; + -h) export HELP_OPTION="true"; IMPORT_ARGS+=("-h");; + --catalog) IMPORT_ARGS+=("--catalog" "$1"); shift;; + --table) IMPORT_ARGS+=("--table" "$1"); shift;; + --schema) IMPORT_ARGS+=("--schema" "$1"); shift;; + --cronExpression) + CRON_EXPR="$1" + shift + while [[ "$1" != "" && "$1" != -* ]]; do + CRON_EXPR="$CRON_EXPR $1" + shift + done + IMPORT_ARGS+=("--cronExpression" "$CRON_EXPR");; + --help) export HELP_OPTION="true"; IMPORT_ARGS+=("--help");; + -*) + echo "Invalid argument found" + export HELP_OPTION="true"; IMPORT_ARGS+=("--help") + break;; + "") break;; + esac +done + +JAVA_PROPERTIES="${JAVA_PROPERTIES} ${JVM_ARGS}" + +if [ -z ${HELP_OPTION} ]; then + echo "Log file for import is $LOGFILE" +fi + +"${JAVA_BIN}" ${JAVA_PROPERTIES} -cp "${CP}" org.apache.atlas.trino.cli.TrinoExtractor "${IMPORT_ARGS[@]}" + +set +f + +RETVAL=$? +if [ -z ${HELP_OPTION} ]; then + [ $RETVAL -eq 0 ] && echo Trino Meta Data imported successfully! + [ $RETVAL -eq 1 ] && echo Failed to import Trino Meta Data! Check logs at: $LOGFILE for details. +fi + +exit $RETVAL diff --git a/addons/trino-extractor/src/main/conf/atlas-application.properties b/addons/trino-extractor/src/main/conf/atlas-application.properties new file mode 100755 index 000000000..b4482b984 --- /dev/null +++ b/addons/trino-extractor/src/main/conf/atlas-application.properties @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +######## Atlas connection ############ +atlas.rest.address=http://localhost:21000/ + +######## Trino connection ############ +atlas.trino.jdbc.address=jdbc:trino://<host>:<port>/ +atlas.trino.jdbc.user=<username> + +######## Trino environment name ###### +atlas.trino.namespace=trino_prod +#atlas.trino.catalogs.registered= + +######## Datasource for which ######## +######## Atlas hook is enabled ####### +#atlas.trino.catalog.hook.enabled.hive_catalog=true +#atlas.trino.catalog.hook.enabled.hive_catalog.namespace=cm \ No newline at end of file diff --git a/addons/trino-extractor/src/main/conf/atlas-log4j.xml b/addons/trino-extractor/src/main/conf/atlas-log4j.xml new file mode 100644 index 000000000..371c3b3e3 --- /dev/null +++ b/addons/trino-extractor/src/main/conf/atlas-log4j.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> + +<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> + <appender name="FILE" class="org.apache.log4j.RollingFileAppender"> + <param name="File" value="${atlas.log.dir}/${atlas.log.file}"/> + <param name="Append" value="true"/> + <param name="maxFileSize" value="100MB" /> + <param name="maxBackupIndex" value="20" /> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%C{1}:%L)%n"/> + </layout> + </appender> + + <logger name="org.apache.atlas.trino" additivity="false"> + <level value="info"/> + <appender-ref ref="FILE"/> + </logger> + + <root> + <priority value="warn"/> + <appender-ref ref="FILE"/> + </root> +</log4j:configuration> diff --git a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorContext.java b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorContext.java new file mode 100644 index 000000000..d20c142b5 --- /dev/null +++ b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorContext.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.cli; + +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasException; +import org.apache.atlas.trino.client.AtlasClientHelper; +import org.apache.atlas.trino.client.TrinoClientHelper; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.configuration.Configuration; + +import java.io.IOException; + +public class ExtractorContext { + + static final String TRINO_NAMESPACE_CONF = "atlas.trino.namespace"; + static final String DEFAULT_TRINO_NAMESPACE = "cm"; + static final String OPTION_CATALOG_SHORT = "c"; + static final String OPTION_CATALOG_LONG = "catalog"; + static final String OPTION_SCHEMA_SHORT = "s"; + static final String OPTION_SCHEMA_LONG = "schema"; + static final String OPTION_TABLE_SHORT = "t"; + static final String OPTION_TABLE_LONG = "table"; + static final String OPTION_CRON_EXPRESSION_SHORT = "cx"; + static final String OPTION_CRON_EXPRESSION_LONG = "cronExpression"; + static final String OPTION_HELP_SHORT = "h"; + static final String OPTION_HELP_LONG = "help"; + private final Configuration atlasConf; + private String namespace; + private String catalog; + private String schema; + private String table; + private AtlasClientHelper atlasClientHelper; + private TrinoClientHelper trinoClientHelper; + private String cronExpression; + + public ExtractorContext(CommandLine cmd) throws AtlasException, IOException { + this.atlasConf = getAtlasProperties(); + this.atlasClientHelper = createAtlasClientHelper(); + this.trinoClientHelper = createTrinoClientHelper(); + this.namespace = atlasConf.getString(TRINO_NAMESPACE_CONF, DEFAULT_TRINO_NAMESPACE); + this.catalog = cmd.getOptionValue(OPTION_CATALOG_SHORT); + this.schema = cmd.getOptionValue(OPTION_SCHEMA_SHORT); + this.table = cmd.getOptionValue(OPTION_TABLE_SHORT); + this.cronExpression = cmd.getOptionValue(OPTION_CRON_EXPRESSION_SHORT); + } + + public Configuration getAtlasConf() { + return atlasConf; + } + + public AtlasClientHelper getAtlasConnector() { + return atlasClientHelper; + } + + public TrinoClientHelper getTrinoConnector() { + return trinoClientHelper; + } + + public String getTable() { + return table; + } + + public String getSchema() { + return schema; + } + + public String getCatalog() { + return catalog; + } + + public String getNamespace() { + return namespace; + } + + public String getCronExpression() { + return cronExpression; + } + + private Configuration getAtlasProperties() throws AtlasException { + return ApplicationProperties.get(); + } + + private TrinoClientHelper createTrinoClientHelper() { + return new TrinoClientHelper(atlasConf); + } + + private AtlasClientHelper createAtlasClientHelper() throws IOException { + return new AtlasClientHelper(atlasConf); + } +} diff --git a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java new file mode 100644 index 000000000..a72b84704 --- /dev/null +++ b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/ExtractorService.java @@ -0,0 +1,349 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.cli; + +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.trino.client.AtlasClientHelper; +import org.apache.atlas.trino.client.TrinoClientHelper; +import org.apache.atlas.trino.model.Catalog; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class ExtractorService { + private static final Logger LOG = LoggerFactory.getLogger(ExtractorService.class); + + public static final int THREAD_POOL_SIZE = 5; + private static final String TRINO_CATALOG_REGISTERED = "atlas.trino.catalogs.registered"; + private static Configuration atlasProperties; + private static TrinoClientHelper trinoClientHelper; + private static AtlasClientHelper atlasClientHelper; + private static String trinoNamespace; + private ExtractorContext context; + + public boolean execute(ExtractorContext context) throws Exception { + this.context = context; + atlasProperties = context.getAtlasConf(); + trinoClientHelper = context.getTrinoConnector(); + atlasClientHelper = context.getAtlasConnector(); + trinoNamespace = context.getNamespace(); + + Map<String, String> catalogs = trinoClientHelper.getAllTrinoCatalogs(); + LOG.info("Found {} catalogs in Trino", catalogs.toString()); + + try { + processCatalogs(context, catalogs); + deleteCatalogs(context, catalogs); + } catch (AtlasServiceException e) { + throw new AtlasServiceException(e); + } + return true; + } + + public void processCatalogs(ExtractorContext context, Map<String, String> catalogInTrino) throws AtlasServiceException { + if (MapUtils.isEmpty(catalogInTrino)) { + LOG.debug("No catalogs found under Trino"); + return; + } + + List<Catalog> catalogsToProcess = new ArrayList<>(); + + if (StringUtils.isEmpty(context.getCatalog())) { + String[] registeredCatalogs = atlasProperties.getStringArray(TRINO_CATALOG_REGISTERED); + + if (registeredCatalogs != null) { + for (String registeredCatalog : registeredCatalogs) { + if (catalogInTrino.containsKey(registeredCatalog)) { + catalogsToProcess.add(getCatalogInstance(registeredCatalog, catalogInTrino.get(registeredCatalog))); + } + } + } + } else { + if (catalogInTrino.containsKey(context.getCatalog())) { + Catalog catalog = getCatalogInstance(context.getCatalog(), catalogInTrino.get(context.getCatalog())); + catalog.setSchemaToImport(context.getSchema()); + catalog.setTableToImport(context.getTable()); + catalogsToProcess.add(catalog); + } + } + + if (CollectionUtils.isEmpty(catalogsToProcess)) { + LOG.warn("No catalogs found to process"); + return; + } else { + LOG.info("{} catalogs to be extracted", catalogsToProcess.stream().map(Catalog::getName).collect(Collectors.toList())); + } + + AtlasEntity.AtlasEntityWithExtInfo trinoInstanceEntity = AtlasClientHelper.createOrUpdateInstanceEntity(trinoNamespace); + + ExecutorService catalogExecutor = Executors.newFixedThreadPool(Math.min(catalogsToProcess.size(), THREAD_POOL_SIZE)); + List<Future<?>> futures = new ArrayList<>(); + + for (Catalog currentCatalog : catalogsToProcess) { + futures.add(catalogExecutor.submit(() -> { + try { + currentCatalog.setTrinoInstanceEntity(trinoInstanceEntity); + processCatalog(currentCatalog); + } catch (Exception e) { + LOG.error("Error processing catalog: {}", currentCatalog, e); + } + })); + } + + catalogExecutor.shutdown(); + try { + if (!catalogExecutor.awaitTermination(30, TimeUnit.MINUTES)) { + LOG.warn("Catalog processing did not complete within the timeout."); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Catalog processing was interrupted", e); + } + + LOG.info("Catalogs scanned for creation/updation completed"); + } + + public void processCatalog(Catalog catalog) throws AtlasServiceException, SQLException { + if (catalog != null) { + LOG.info("Started extracting {} catalog:", catalog.getName()); + String catalogName = catalog.getName(); + // create trino_catalog + AtlasEntity.AtlasEntityWithExtInfo trinoCatalogEntity = AtlasClientHelper.createOrUpdateCatalogEntity(catalog); + + List<String> schemas = trinoClientHelper.getTrinoSchemas(catalogName, catalog.getSchemaToImport()); + LOG.info("Found {} schema under {} catalog", schemas.size(), catalogName); + + processSchemas(catalog, trinoCatalogEntity.getEntity(), schemas); + + if (StringUtils.isEmpty(context.getSchema())) { + deleteSchemas(schemas, trinoCatalogEntity.getEntity().getGuid()); + } + } + } + + public void processSchemas(Catalog catalog, AtlasEntity trinoCatalogEntity, List<String> schemaToImport) { + for (String schemaName : schemaToImport) { + LOG.info("Started extracting {} schema:", schemaName); + try { + AtlasEntity.AtlasEntityWithExtInfo schemaEntity = AtlasClientHelper.createOrUpdateSchemaEntity(catalog, trinoCatalogEntity, schemaName); + + List<String> tables = trinoClientHelper.getTrinoTables(catalog.getName(), schemaName, catalog.getTableToImport()); + LOG.info("Found {} tables under {}.{} catalog.schema", tables.size(), catalog.getName(), schemaName); + + processTables(catalog, schemaName, schemaEntity.getEntity(), tables); + + if (StringUtils.isEmpty(context.getTable())) { + deleteTables(tables, schemaEntity.getEntity().getGuid()); + } + } catch (Exception e) { + LOG.error("Error processing schema: {}", schemaName); + } + } + } + + public void processTables(Catalog catalog, String schemaName, AtlasEntity schemaEntity, List<String> trinoTables) { + for (String trinoTableName : trinoTables) { + LOG.info("Started extracting {} table:", trinoTableName); + + try { + Map<String, Map<String, Object>> trinoColumns = trinoClientHelper.getTrinoColumns(catalog.getName(), schemaName, trinoTableName); + LOG.info("Found {} columns under {}.{}.{} catalog.schema.table", trinoColumns.size(), catalog.getName(), schemaName, trinoTableName); + + AtlasClientHelper.createOrUpdateTableEntity(catalog, schemaName, trinoTableName, trinoColumns, schemaEntity); + } catch (Exception e) { + LOG.error("Error processing table: {}", trinoTableName, e); + } + } + } + + public void deleteCatalogs(ExtractorContext context, Map<String, String> catalogInTrino) throws AtlasServiceException { + if (StringUtils.isNotEmpty(context.getCatalog())) { + return; + } + + AtlasEntityHeader trinoInstance = AtlasClientHelper.getTrinoInstance(trinoNamespace); + if (trinoInstance != null) { + Set<String> catalogsToDelete = getCatalogsToDelete(catalogInTrino, trinoInstance.getGuid()); + + if (CollectionUtils.isNotEmpty(catalogsToDelete)) { + LOG.info("{} non existing catalogs to be deleted", catalogsToDelete, trinoInstance.getGuid()); + + for (String catalogGuid : catalogsToDelete) { + try { + deleteSchemas(null, catalogGuid); + AtlasClientHelper.deleteByGuid(Collections.singleton(catalogGuid)); + } catch (AtlasServiceException e) { + LOG.error("Error deleting catalog: {}", catalogGuid, e); + } + } + } else { + LOG.info("No catalogs found to delete"); + } + } + + LOG.info("Catalogs scanned for deletion completed"); + } + + public void deleteSchemas(List<String> schemasInTrino, String catalogGuid) { + try { + Set<String> schemasToDelete = getSchemasToDelete(schemasInTrino, catalogGuid); + + if (CollectionUtils.isNotEmpty(schemasToDelete)) { + LOG.info("{} non existing schemas under {} catalog found, starting to delete", schemasToDelete, catalogGuid); + + for (String schemaGuid : schemasToDelete) { + try { + deleteTables(null, schemaGuid); + AtlasClientHelper.deleteByGuid(Collections.singleton(schemaGuid)); + } catch (AtlasServiceException e) { + LOG.error("Error in deleting schema: {}", schemaGuid, e); + } + } + } else { + LOG.info("No schemas found under {} catalog to delete", catalogGuid); + } + } catch (AtlasServiceException e) { + LOG.error("Error in deleting schemas ", catalogGuid, e); + } + } + + private void deleteTables(List<String> tablesInTrino, String schemaGuid) { + try { + Set<String> tablesToDelete = getTablesToDelete(tablesInTrino, schemaGuid); + + if (CollectionUtils.isNotEmpty(tablesToDelete)) { + LOG.info("{} non existing tables under {} schema found, starting to delete", tablesToDelete, schemaGuid); + + for (String tableGuid : tablesToDelete) { + try { + AtlasClientHelper.deleteByGuid(Collections.singleton(tableGuid)); + } catch (AtlasServiceException e) { + LOG.error("Error deleting table: {}", tableGuid, e); + } + } + } else { + LOG.info("No non existing tables found under {} schema", schemaGuid); + } + } catch (AtlasServiceException e) { + LOG.error("Error deleting tables under schema: {}", schemaGuid, e); + } + } + + private static Catalog getCatalogInstance(String catalogName, String connectorType) { + if (catalogName == null) { + return null; + } + + boolean isHookEnabled = atlasProperties.getBoolean("atlas.trino.catalog.hook.enabled." + catalogName, false); + String hookNamespace = null; + if (isHookEnabled) { + hookNamespace = atlasProperties.getString("atlas.trino.catalog.hook.enabled." + catalogName + ".namespace"); + } + Catalog catalog = new Catalog(catalogName, connectorType, isHookEnabled, hookNamespace, trinoNamespace); + return catalog; + } + + private static Set<String> getCatalogsToDelete(Map<String, String> catalogInTrino, String instanceGuid) throws AtlasServiceException { + + if (instanceGuid != null) { + + List<AtlasEntityHeader> catalogsInAtlas = AtlasClientHelper.getAllCatalogsInInstance(instanceGuid); + if (catalogsInAtlas != null) { + + if (catalogInTrino == null) { + catalogInTrino = new HashMap<>(); + } + + Map<String, String> finalCatalogInTrino = catalogInTrino; + return catalogsInAtlas.stream() + .filter(entity -> entity.getAttribute("name") != null) // Ensure "name" attribute exists + .filter(entity -> !finalCatalogInTrino.containsKey(entity.getAttribute("name"))) // Only missing schemas + .map(AtlasEntityHeader::getGuid) // Extract GUIDs + .collect(Collectors.toSet()); + } + } + + return new HashSet<>(); + } + + private static Set<String> getSchemasToDelete(List<String> schemasInTrino, String catalogGuid) throws AtlasServiceException { + + if (catalogGuid != null) { + + List<AtlasEntityHeader> schemasInAtlas = AtlasClientHelper.getAllSchemasInCatalog(catalogGuid); + if (schemasInAtlas != null) { + + if (schemasInTrino == null) { + schemasInTrino = new ArrayList<>(); + } + + List<String> finalSchemasInTrino = schemasInTrino; + return schemasInAtlas.stream() + .filter(entity -> entity.getAttribute("name") != null) // Ensure "name" attribute exists + .filter(entity -> !finalSchemasInTrino.contains(entity.getAttribute("name"))) // Only missing schemas + .map(AtlasEntityHeader::getGuid) // Extract GUIDs + .collect(Collectors.toSet()); + } + } + + return new HashSet<>(); + } + + private static Set<String> getTablesToDelete(List<String> tablesInTrino, String schemaGuid) throws AtlasServiceException { + + if (schemaGuid != null) { + + List<AtlasEntityHeader> tablesInAtlas = AtlasClientHelper.getAllTablesInSchema(schemaGuid); + if (tablesInAtlas != null) { + + if (tablesInTrino == null) { + tablesInTrino = new ArrayList<>(); + } + + List<String> finalTablesInTrino = tablesInTrino; + return tablesInAtlas.stream() + .filter(entity -> entity.getAttribute("name") != null) // Ensure "name" attribute exists + .filter(entity -> !finalTablesInTrino.contains(entity.getAttribute("name"))) // Only missing schemas + .map(AtlasEntityHeader::getGuid) // Extract GUIDs + .collect(Collectors.toSet()); + } + } + + return new HashSet<>(); + } +} diff --git a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/TrinoExtractor.java b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/TrinoExtractor.java new file mode 100644 index 000000000..4a0ebe085 --- /dev/null +++ b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/cli/TrinoExtractor.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.trino.cli; + +import org.apache.atlas.AtlasException; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.lang.StringUtils; +import org.quartz.CronExpression; +import org.quartz.CronScheduleBuilder; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.Job; +import org.quartz.JobBuilder; +import org.quartz.JobDetail; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.quartz.Scheduler; +import org.quartz.SchedulerFactory; +import org.quartz.Trigger; +import org.quartz.TriggerBuilder; +import org.quartz.impl.StdSchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_CATALOG_LONG; +import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_CATALOG_SHORT; +import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_CRON_EXPRESSION_LONG; +import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_CRON_EXPRESSION_SHORT; +import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_HELP_LONG; +import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_HELP_SHORT; +import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_SCHEMA_LONG; +import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_SCHEMA_SHORT; +import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_TABLE_LONG; +import static org.apache.atlas.trino.cli.ExtractorContext.OPTION_TABLE_SHORT; + +public class TrinoExtractor { + private static final Logger LOG = LoggerFactory.getLogger(TrinoExtractor.class); + + private static final int EXIT_CODE_SUCCESS = 0; + private static final int EXIT_CODE_FAILED = 1; + private static final int EXIT_CODE_HELP = 2; + private static int exitCode = EXIT_CODE_FAILED; + + private static ExtractorContext extractorContext; + + public static void main(String[] args) { + try { + extractorContext = createExtractorContext(args); + if (extractorContext != null) { + String cronExpression = extractorContext.getCronExpression(); + + if (StringUtils.isNotEmpty(cronExpression)) { + + if (!CronExpression.isValidExpression(cronExpression)) { + LOG.error("Invalid cron expression provided: {}", cronExpression); + } else { + LOG.info("Cron Expression found, scheduling the job for {}", cronExpression); + SchedulerFactory sf = new StdSchedulerFactory(); + Scheduler scheduler = sf.getScheduler(); + + JobDetail job = JobBuilder.newJob(MetadataJob.class).withIdentity("metadataJob", "group1").build(); + Trigger trigger = TriggerBuilder.newTrigger() + .withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)).startNow() + .build(); + + scheduler.scheduleJob(job, trigger); + scheduler.start(); + Thread.currentThread().join(); + } + } else { + LOG.warn("Cron Expression missing, hence will run the job once"); + ExtractorService extractorService = new ExtractorService(); + if (extractorService.execute(extractorContext)) { + exitCode = EXIT_CODE_SUCCESS; + } + } + } + } catch (Exception e) { + LOG.error("Error encountered.", e); + System.out.println("exitCode : " + exitCode); + System.out.println("Error encountered." + e); + } finally { + if (extractorContext != null && extractorContext.getAtlasConnector() != null) { + extractorContext.getAtlasConnector().close(); + } + } + + System.exit(exitCode); + } + + static ExtractorContext createExtractorContext(String[] args) throws AtlasBaseException, IOException { + Options acceptedCliOptions = prepareCommandLineOptions(); + + try { + CommandLine cmd = new BasicParser().parse(acceptedCliOptions, args, true); + List<String> argsNotProcessed = cmd.getArgList(); + + if (argsNotProcessed != null && argsNotProcessed.size() > 0) { + throw new AtlasBaseException("Unrecognized arguments."); + } + + ExtractorContext ret = null; + if (cmd.hasOption(ExtractorContext.OPTION_HELP_SHORT)) { + printUsage(acceptedCliOptions); + exitCode = EXIT_CODE_HELP; + } else { + ret = new ExtractorContext(cmd); + LOG.debug("Successfully initialized the extractor context."); + } + + return ret; + } catch (ParseException | AtlasBaseException e) { + printUsage(acceptedCliOptions); + + throw new AtlasBaseException("Invalid arguments. Reason: " + e.getMessage(), e); + } catch (AtlasException e) { + throw new AtlasBaseException("Error in getting Application Properties. Reason: " + e.getMessage(), e); + } catch (IOException e) { + throw new IOException(e); + } + } + + private static Options prepareCommandLineOptions() { + Options acceptedCliOptions = new Options(); + + return acceptedCliOptions.addOption(OPTION_CATALOG_SHORT, OPTION_CATALOG_LONG, true, "Catalog name") + .addOption(OPTION_SCHEMA_SHORT, OPTION_SCHEMA_LONG, true, "Schema name") + .addOption(OPTION_TABLE_SHORT, OPTION_TABLE_LONG, true, "Table name") + .addOption(OPTION_CRON_EXPRESSION_SHORT, OPTION_CRON_EXPRESSION_LONG, true, "Cron expression to run extraction") + .addOption(OPTION_HELP_SHORT, OPTION_HELP_LONG, false, "Print help message"); + } + + private static void printUsage(Options options) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp(TrinoExtractor.class.getName(), options); + } + + @DisallowConcurrentExecution + public static class MetadataJob implements Job { + private final Logger LOG = LoggerFactory.getLogger(MetadataJob.class); + + public void execute(JobExecutionContext context) throws JobExecutionException { + if (extractorContext != null) { + LOG.info("Executing metadata extraction at: {}", java.time.LocalTime.now()); + ExtractorService extractorService = new ExtractorService(); + + try { + if (extractorService.execute(extractorContext)) { + exitCode = EXIT_CODE_SUCCESS; + } + } catch (Exception e) { + LOG.error("Error encountered: ", e); + throw new JobExecutionException(e); + } + LOG.info("Completed executing metadata extraction at: {}", java.time.LocalTime.now()); + } + } + } +} diff --git a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/AtlasClientHelper.java b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/AtlasClientHelper.java new file mode 100644 index 000000000..32b8c738f --- /dev/null +++ b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/AtlasClientHelper.java @@ -0,0 +1,426 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.client; + +import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.AtlasClientV2; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.discovery.AtlasSearchResult; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntityHeader; +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.model.instance.EntityMutations; +import org.apache.atlas.trino.model.Catalog; +import org.apache.atlas.type.AtlasTypeUtil; +import org.apache.atlas.utils.AuthenticationUtil; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.atlas.type.AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME; + +public class AtlasClientHelper { + private static final Logger LOG = LoggerFactory.getLogger(AtlasClientHelper.class); + + public static final String TRINO_INSTANCE = "trino_instance"; + public static final String TRINO_CATALOG = "trino_catalog"; + public static final String TRINO_SCHEMA = "trino_schema"; + public static final String TRINO_TABLE = "trino_table"; + public static final String TRINO_COLUMN = "trino_column"; + public static final String TRINO_INSTANCE_CATALOG_ATTRIBUTE = "catalogs"; + public static final String TRINO_CATALOG_SCHEMA_ATTRIBUTE = "schemas"; + public static final String TRINO_SCHEMA_TABLE_ATTRIBUTE = "tables"; + public static final String QUALIFIED_NAME_ATTRIBUTE = "qualifiedName"; + public static final String NAME_ATTRIBUTE = "name"; + public static final int pageLimit = 10000; + private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/"; + private static final String APPLICATION_PROPERTY_ATLAS_ENDPOINT = "atlas.rest.address"; + private static final String TRINO_CATALOG_CONNECTOR_TYPE_ATTRIBUTE = "connectorType"; + private static final String TRINO_CATALOG_INSTANCE_ATTRIBUTE = "instance"; + private static final String TRINO_CATALOG_INSTANCE_RELATIONSHIP = "trino_instance_catalog"; + private static final String TRINO_SCHEMA_CATALOG_ATTRIBUTE = "catalog"; + private static final String TRINO_SCHEMA_CATALOG_RELATIONSHIP = "trino_schema_catalog"; + private static final String TRINO_COLUMN_DATA_TYPE_ATTRIBUTE = "data_type"; + private static final String TRINO_COLUMN_ORIDINAL_POSITION_ATTRIBUTE = "ordinal_position"; + private static final String TRINO_COLUMN_COLUMN_DEFAULT_ATTRIBUTE = "column_default"; + private static final String TRINO_COLUMN_IS_NULLABLE_ATTRIBUTE = "is_nullable"; + private static final String TRINO_COLUMN_TABLE_ATTRIBUTE = "table"; + private static final String TRINO_TABLE_COLUMN_RELATIONSHIP = "trino_table_columns"; + private static final String TRINO_TABLE_SCHEMA_RELATIONSHIP = "trino_table_schema"; + private static final String TRINO_TABLE_SCHEMA_ATTRIBUTE = "trinoschema"; + private static final String TRINO_TABLE_COLUMN_ATTRIBUTE = "columns"; + + private static AtlasClientV2 atlasClientV2; + + public AtlasClientHelper(Configuration atlasConf) throws IOException { + atlasClientV2 = getAtlasClientV2Instance(atlasConf); + } + + public static synchronized AtlasClientV2 getAtlasClientV2Instance(Configuration atlasConf) throws IOException { + if (atlasClientV2 == null) { + String[] atlasEndpoint = new String[] {DEFAULT_ATLAS_URL}; + + if (atlasConf != null && ArrayUtils.isNotEmpty(atlasConf.getStringArray(APPLICATION_PROPERTY_ATLAS_ENDPOINT))) { + atlasEndpoint = atlasConf.getStringArray(APPLICATION_PROPERTY_ATLAS_ENDPOINT); + } + + if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) { + String[] basicAuthUsernamePassword = AuthenticationUtil.getBasicAuthenticationInput(); + atlasClientV2 = new AtlasClientV2(atlasEndpoint, basicAuthUsernamePassword); + } else { + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + atlasClientV2 = new AtlasClientV2(ugi, ugi.getShortUserName(), atlasEndpoint); + } + } + return atlasClientV2; + } + + public static List<AtlasEntityHeader> getAllCatalogsInInstance(String instanceGuid) throws AtlasServiceException { + + List<AtlasEntityHeader> entities = getAllRelationshipEntities(instanceGuid, TRINO_INSTANCE_CATALOG_ATTRIBUTE); + if (CollectionUtils.isNotEmpty(entities)) { + LOG.debug("Retrieved {} catalogs of {} trino instance", entities.size(), instanceGuid); + return entities; + } else { + LOG.debug("No catalog found under {} trino instance", instanceGuid); + return null; + } + } + + public static List<AtlasEntityHeader> getAllSchemasInCatalog(String catalogGuid) throws AtlasServiceException { + + List<AtlasEntityHeader> entities = getAllRelationshipEntities(catalogGuid, TRINO_CATALOG_SCHEMA_ATTRIBUTE); + if (CollectionUtils.isNotEmpty(entities)) { + LOG.debug("Retrieved {} schemas of {} trino catalog", entities.size(), catalogGuid); + return entities; + } else { + LOG.debug("No schema found under {} trino catalog", catalogGuid); + return null; + } + } + + public static List<AtlasEntityHeader> getAllTablesInSchema(String schemaGuid) throws AtlasServiceException { + + List<AtlasEntityHeader> entities = getAllRelationshipEntities(schemaGuid, TRINO_SCHEMA_TABLE_ATTRIBUTE); + if (CollectionUtils.isNotEmpty(entities)) { + LOG.debug("Retrieved {} tables of {} trino schema", entities.size(), schemaGuid); + return entities; + } else { + LOG.debug("No table found under {} trino schema", schemaGuid); + return null; + } + } + + public static List<AtlasEntityHeader> getAllRelationshipEntities(String entityGuid, String relationshipAttributeName) throws AtlasServiceException { + + if (entityGuid == null) { + return null; + } + List<AtlasEntityHeader> entities = new ArrayList<>(); + final int pageSize = pageLimit; + + for (int i = 0; ; i++) { + int offset = pageSize * i; + LOG.debug("Retrieving: offset={}, pageSize={}", offset, pageSize); + + AtlasSearchResult searchResult = atlasClientV2.relationshipSearch(entityGuid, relationshipAttributeName, null, null, true, pageSize, offset); + + List<AtlasEntityHeader> entityHeaders = searchResult == null ? null : searchResult.getEntities(); + int count = entityHeaders == null ? 0 : entityHeaders.size(); + + if (count > 0) { + entities.addAll(entityHeaders); + } + + if (count < pageSize) { // last page + break; + } + } + + return entities; + } + + public static AtlasEntityHeader getTrinoInstance(String namespace) throws AtlasServiceException { + return atlasClientV2.getEntityHeaderByAttribute(TRINO_INSTANCE, Collections.singletonMap(QUALIFIED_NAME_ATTRIBUTE, namespace)); + } + + public static AtlasEntity.AtlasEntityWithExtInfo createOrUpdateInstanceEntity(String trinoNamespace) throws AtlasServiceException { + String qualifiedName = trinoNamespace; + AtlasEntity.AtlasEntityWithExtInfo ret = findEntity(TRINO_INSTANCE, qualifiedName, true, true); + + if (ret == null) { + ret = new AtlasEntity.AtlasEntityWithExtInfo(); + AtlasEntity entity = new AtlasEntity(TRINO_INSTANCE); + + entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, qualifiedName); + entity.setAttribute(NAME_ATTRIBUTE, trinoNamespace); + + ret.setEntity(entity); + ret = createEntity(ret); + } + return ret; + } + + public static AtlasEntity.AtlasEntityWithExtInfo createOrUpdateCatalogEntity(Catalog catalog) throws AtlasServiceException { + String catalogName = catalog.getName(); + String trinoNamespace = catalog.getInstanceName(); + + AtlasEntity.AtlasEntityWithExtInfo ret = findEntity(TRINO_CATALOG, catalogName + "@" + trinoNamespace, true, true); + if (ret == null) { + ret = new AtlasEntity.AtlasEntityWithExtInfo(); + AtlasEntity entity = new AtlasEntity(TRINO_CATALOG); + + entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, catalogName + "@" + trinoNamespace); + entity.setAttribute(NAME_ATTRIBUTE, catalogName); + entity.setAttribute(TRINO_CATALOG_CONNECTOR_TYPE_ATTRIBUTE, catalog.getType()); + entity.setRelationshipAttribute(TRINO_CATALOG_INSTANCE_ATTRIBUTE, AtlasTypeUtil.getAtlasRelatedObjectId(catalog.getTrinoInstanceEntity().getEntity(), TRINO_CATALOG_INSTANCE_RELATIONSHIP)); + + if (catalog.getConnector() != null) { + catalog.getConnector().connectTrinoCatalog(catalog.getHookInstanceName(), catalogName, entity, ret); + } + ret.setEntity(entity); + ret = createEntity(ret); + } else { + AtlasEntity entity = ret.getEntity(); + entity.setRelationshipAttribute(TRINO_CATALOG_INSTANCE_ATTRIBUTE, AtlasTypeUtil.getAtlasRelatedObjectId(catalog.getTrinoInstanceEntity().getEntity(), TRINO_CATALOG_INSTANCE_RELATIONSHIP)); + + if (catalog.getConnector() != null) { + catalog.getConnector().connectTrinoCatalog(catalog.getHookInstanceName(), catalogName, entity, ret); + } + ret.setEntity(entity); + updateEntity(ret); + } + + return ret; + } + + public static AtlasEntity.AtlasEntityWithExtInfo createOrUpdateSchemaEntity(Catalog catalog, AtlasEntity catalogEntity, String schema) throws AtlasServiceException { + String qualifiedName = catalog.getName() + "." + schema + "@" + catalog.getInstanceName(); + + AtlasEntity.AtlasEntityWithExtInfo ret = findEntity(TRINO_SCHEMA, qualifiedName, true, true); + + if (ret == null) { + ret = new AtlasEntity.AtlasEntityWithExtInfo(); + AtlasEntity entity = new AtlasEntity(TRINO_SCHEMA); + + entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, qualifiedName); + entity.setAttribute(NAME_ATTRIBUTE, schema); + entity.setRelationshipAttribute(TRINO_SCHEMA_CATALOG_ATTRIBUTE, AtlasTypeUtil.getAtlasRelatedObjectId(catalogEntity, TRINO_SCHEMA_CATALOG_RELATIONSHIP)); + + if (catalog.getConnector() != null) { + catalog.getConnector().connectTrinoSchema(catalog.getHookInstanceName(), catalog.getName(), schema, entity, ret); + } + + ret.setEntity(entity); + ret = createEntity(ret); + } else { + AtlasEntity entity = ret.getEntity(); + entity.setRelationshipAttribute(TRINO_SCHEMA_CATALOG_ATTRIBUTE, AtlasTypeUtil.getAtlasRelatedObjectId(catalogEntity, TRINO_SCHEMA_CATALOG_RELATIONSHIP)); + + if (catalog.getConnector() != null) { + catalog.getConnector().connectTrinoSchema(catalog.getHookInstanceName(), catalog.getName(), schema, entity, ret); + } + ret.setEntity(entity); + updateEntity(ret); + } + + return ret; + } + + public static AtlasEntity.AtlasEntityWithExtInfo createOrUpdateTableEntity(Catalog catalog, String schema, String table, Map<String, Map<String, Object>> trinoColumns, AtlasEntity schemaEntity) throws AtlasServiceException { + String qualifiedName = catalog.getName() + "." + schema + "." + table + "@" + catalog.getInstanceName(); + + AtlasEntity.AtlasEntityWithExtInfo ret; + AtlasEntity.AtlasEntityWithExtInfo tableEntityExt = findEntity(TRINO_TABLE, qualifiedName, true, true); + + if (tableEntityExt == null) { + tableEntityExt = toTableEntity(catalog, schema, table, trinoColumns, schemaEntity, tableEntityExt); + ret = createEntity(tableEntityExt); + } else { + ret = toTableEntity(catalog, schema, table, trinoColumns, schemaEntity, tableEntityExt); + updateEntity(ret); + } + + return ret; + } + + public static AtlasEntity.AtlasEntityWithExtInfo toTableEntity(Catalog catalog, String schema, String table, Map<String, Map<String, Object>> trinoColumns, AtlasEntity schemaEntity, AtlasEntity.AtlasEntityWithExtInfo tableEntityExt) { + if (tableEntityExt == null) { + tableEntityExt = new AtlasEntity.AtlasEntityWithExtInfo(new AtlasEntity(TRINO_TABLE)); + } + + String qualifiedName = catalog.getName() + "." + schema + "." + table + "@" + catalog.getInstanceName(); + + AtlasEntity tableEntity = tableEntityExt.getEntity(); + tableEntity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, qualifiedName); + tableEntity.setAttribute(NAME_ATTRIBUTE, table); + + List<AtlasEntity> columnEntities = new ArrayList<>(); + for (Map.Entry<String, Map<String, Object>> columnEntry : trinoColumns.entrySet()) { + AtlasEntity entity = new AtlasEntity(TRINO_COLUMN); + + String columnName = columnEntry.getKey(); + String colQualifiedName = catalog.getName() + "." + schema + "." + table + "." + columnName + "@" + catalog.getInstanceName(); + + entity.setAttribute(QUALIFIED_NAME_ATTRIBUTE, colQualifiedName); + entity.setAttribute(NAME_ATTRIBUTE, columnName); + if (MapUtils.isNotEmpty(columnEntry.getValue())) { + Map<String, Object> columnAttr = columnEntry.getValue(); + entity.setAttribute(TRINO_COLUMN_DATA_TYPE_ATTRIBUTE, columnAttr.get(TRINO_COLUMN_DATA_TYPE_ATTRIBUTE)); + entity.setAttribute(TRINO_COLUMN_ORIDINAL_POSITION_ATTRIBUTE, columnAttr.get(TRINO_COLUMN_ORIDINAL_POSITION_ATTRIBUTE)); + entity.setAttribute(TRINO_COLUMN_COLUMN_DEFAULT_ATTRIBUTE, columnAttr.get(TRINO_COLUMN_COLUMN_DEFAULT_ATTRIBUTE)); + entity.setAttribute(TRINO_COLUMN_IS_NULLABLE_ATTRIBUTE, columnAttr.get(TRINO_COLUMN_IS_NULLABLE_ATTRIBUTE)); + } + + entity.setRelationshipAttribute(TRINO_COLUMN_TABLE_ATTRIBUTE, AtlasTypeUtil.getAtlasRelatedObjectId(tableEntity, TRINO_TABLE_COLUMN_RELATIONSHIP)); + columnEntities.add(entity); + } + + tableEntity.setRelationshipAttribute(TRINO_TABLE_SCHEMA_ATTRIBUTE, AtlasTypeUtil.getAtlasRelatedObjectId(schemaEntity, TRINO_TABLE_SCHEMA_RELATIONSHIP)); + tableEntity.setRelationshipAttribute(TRINO_TABLE_COLUMN_ATTRIBUTE, AtlasTypeUtil.getAtlasRelatedObjectIds(columnEntities, TRINO_TABLE_COLUMN_RELATIONSHIP)); + + if (catalog.getConnector() != null) { + catalog.getConnector().connectTrinoTable(catalog.getHookInstanceName(), catalog.getName(), schema, table, tableEntity, columnEntities, tableEntityExt); + } + + tableEntityExt.addReferredEntity(schemaEntity); + if (columnEntities != null) { + for (AtlasEntity column : columnEntities) { + tableEntityExt.addReferredEntity(column); + } + } + + tableEntityExt.setEntity(tableEntity); + return tableEntityExt; + } + + public static AtlasEntity.AtlasEntityWithExtInfo findEntity(final String typeName, final String qualifiedName, boolean minExtInfo, boolean ignoreRelationship) throws AtlasServiceException { + AtlasEntity.AtlasEntityWithExtInfo ret = null; + + try { + ret = atlasClientV2.getEntityByAttribute(typeName, Collections.singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName), minExtInfo, ignoreRelationship); + } catch (AtlasServiceException e) { + if (e.getStatus() == ClientResponse.Status.NOT_FOUND) { + return null; + } + + throw e; + } + return ret; + } + + public static AtlasEntity.AtlasEntityWithExtInfo createEntity(AtlasEntity.AtlasEntityWithExtInfo entity) throws AtlasServiceException { + LOG.debug("creating {} entity: {}", entity.getEntity().getTypeName(), entity); + + AtlasEntity.AtlasEntityWithExtInfo ret = null; + EntityMutationResponse response = atlasClientV2.createEntity(entity); + List<AtlasEntityHeader> createdEntities = response.getEntitiesByOperation(EntityMutations.EntityOperation.CREATE); + + if (CollectionUtils.isNotEmpty(createdEntities)) { + for (AtlasEntityHeader createdEntity : createdEntities) { + if (ret == null) { + ret = atlasClientV2.getEntityByGuid(createdEntity.getGuid()); + + LOG.debug("Created {} entity: name={}, guid={}", ret.getEntity().getTypeName(), ret.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), ret.getEntity().getGuid()); + } else if (ret.getEntity(createdEntity.getGuid()) == null) { + AtlasEntity.AtlasEntityWithExtInfo newEntity = atlasClientV2.getEntityByGuid(createdEntity.getGuid()); + + ret.addReferredEntity(newEntity.getEntity()); + + if (MapUtils.isNotEmpty(newEntity.getReferredEntities())) { + for (Map.Entry<String, AtlasEntity> entry : newEntity.getReferredEntities().entrySet()) { + ret.addReferredEntity(entry.getKey(), entry.getValue()); + } + } + + LOG.debug("Created {} entity: name={}, guid={}", newEntity.getEntity().getTypeName(), newEntity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), newEntity.getEntity().getGuid()); + } + } + } + + clearRelationshipAttributes(ret); + + return ret; + } + + public static void deleteByGuid(Set<String> guidTodelete) throws AtlasServiceException { + + if (CollectionUtils.isNotEmpty(guidTodelete)) { + + for (String guid : guidTodelete) { + EntityMutationResponse response = atlasClientV2.deleteEntityByGuid(guid); + + if (response == null || response.getDeletedEntities().size() < 1) { + LOG.debug("Entity with guid : {} is not deleted", guid); + } else { + LOG.debug("Entity with guid : {} is deleted", guid); + } + } + } + } + + public static void close() { + atlasClientV2.close(); + } + + + + private static void updateEntity(AtlasEntity.AtlasEntityWithExtInfo entity) throws AtlasServiceException { + LOG.debug("updating {} entity: {}", entity.getEntity().getTypeName(), entity); + + atlasClientV2.updateEntity(entity); + + LOG.debug("Updated {} entity: name={}, guid={}", entity.getEntity().getTypeName(), entity.getEntity().getAttribute(ATTRIBUTE_QUALIFIED_NAME), entity.getEntity().getGuid()); + } + + private static void clearRelationshipAttributes(AtlasEntity.AtlasEntityWithExtInfo entity) { + if (entity != null) { + clearRelationshipAttributes(entity.getEntity()); + + if (entity.getReferredEntities() != null) { + clearRelationshipAttributes(entity.getReferredEntities().values()); + } + } + } + + private static void clearRelationshipAttributes(Collection<AtlasEntity> entities) { + if (entities != null) { + for (AtlasEntity entity : entities) { + clearRelationshipAttributes(entity); + } + } + } + + private static void clearRelationshipAttributes(AtlasEntity entity) { + if (entity != null && entity.getRelationshipAttributes() != null) { + entity.getRelationshipAttributes().clear(); + } + } +} diff --git a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/TrinoClientHelper.java b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/TrinoClientHelper.java new file mode 100644 index 000000000..ef24cb6e4 --- /dev/null +++ b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/client/TrinoClientHelper.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.client; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TrinoClientHelper { + + private static String jdbcUrl; + private static String username; + private static String password; + + public TrinoClientHelper(Configuration atlasConf) { + this.jdbcUrl = atlasConf.getString("atlas.trino.jdbc.address"); + this.username = atlasConf.getString("atlas.trino.jdbc.user"); + this.password = atlasConf.getString("atlas.trino.jdbc.password", ""); + } + + public static Connection getTrinoConnection() throws SQLException { + return DriverManager.getConnection(jdbcUrl, username, password); + } + + public Map<String, String> getAllTrinoCatalogs() { + Map<String, String> catalogs = new HashMap<>(); + try { + Connection connection = getTrinoConnection(); + Statement stmt = connection.createStatement(); + StringBuilder query = new StringBuilder(); + query.append("SELECT catalog_name, connector_name FROM system.metadata.catalogs"); + + ResultSet rs = stmt.executeQuery(query.toString()); + while (rs.next()) { + catalogs.put(rs.getString("catalog_name"), rs.getString("connector_name")); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + + return catalogs; + } + + public List<String> getTrinoSchemas(String catalog, String schemaToImport) throws SQLException { + List<String> schemas = new ArrayList<>(); + Connection connection = getTrinoConnection(); + Statement stmt = connection.createStatement(); + StringBuilder query = new StringBuilder(); + query.append("SELECT schema_name FROM " + catalog + ".information_schema.schemata"); + + if (StringUtils.isNotEmpty(schemaToImport)) { + query.append(" where schema_name = '" + schemaToImport + "'"); + } + + ResultSet rs = stmt.executeQuery(query.toString()); + while (rs.next()) { + schemas.add(rs.getString("schema_name")); + } + + return schemas; + } + + public List<String> getTrinoTables(String catalog, String schema, String tableToImport) throws SQLException { + List<String> tables = new ArrayList<>(); + Connection connection = getTrinoConnection(); + Statement stmt = connection.createStatement(); + StringBuilder query = new StringBuilder(); + query.append("SELECT table_name FROM " + catalog + ".information_schema.tables WHERE table_schema = '" + schema + "'"); + if (StringUtils.isNotEmpty(tableToImport)) { + query.append(" and table_name = '" + tableToImport + "'"); + } + + ResultSet rs = stmt.executeQuery(query.toString()); + while (rs.next()) { + tables.add(rs.getString("table_name")); + } + + return tables; + } + + public Map<String, Map<String, Object>> getTrinoColumns(String catalog, String schema, String table) throws SQLException { + Map<String, Map<String, Object>> columns = new HashMap<>(); + Connection connection = getTrinoConnection(); + Statement stmt = connection.createStatement(); + StringBuilder query = new StringBuilder(); + query.append("SELECT column_name, ordinal_position, column_default, is_nullable, data_type FROM " + catalog + ".information_schema.columns WHERE table_schema = '" + schema + "' AND table_name = '" + table + "'"); + + ResultSet rs = stmt.executeQuery(query.toString()); + while (rs.next()) { + Map<String, Object> columnMetadata = new HashMap<>(); + columnMetadata.put("ordinal_position", rs.getInt("ordinal_position")); + columnMetadata.put("column_default", rs.getString("column_default")); + columnMetadata.put("column_name", rs.getString("column_name")); + if (StringUtils.isNotEmpty(rs.getString("is_nullable"))) { + if (StringUtils.equalsIgnoreCase(rs.getString("is_nullable"), "YES")) { + columnMetadata.put("is_nullable", true); + } else { + columnMetadata.put("is_nullable", false); + } + } + columnMetadata.put("data_type", rs.getString("data_type")); + + columns.put(rs.getString("column_name"), columnMetadata); + } + + return columns; + } +} diff --git a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/AtlasEntityConnector.java b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/AtlasEntityConnector.java new file mode 100644 index 000000000..33e716ee3 --- /dev/null +++ b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/AtlasEntityConnector.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.connector; + +import org.apache.atlas.model.instance.AtlasEntity; + +import java.util.List; + +public abstract class AtlasEntityConnector { + + public abstract void connectTrinoCatalog(String instanceName, String catalogName, AtlasEntity entity, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo); + + public abstract void connectTrinoSchema(String instanceName, String catalogName, String schemaName, AtlasEntity entity, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo); + + public abstract void connectTrinoTable(String instanceName, String catalogName, String schemaName, String tableName, AtlasEntity entity, List<AtlasEntity> columnEntities, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo); +} diff --git a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/ConnectorFactory.java b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/ConnectorFactory.java new file mode 100644 index 000000000..4fa4d2b90 --- /dev/null +++ b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/ConnectorFactory.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.connector; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConnectorFactory { + private static final Logger LOG = LoggerFactory.getLogger(ConnectorFactory.class); + + public static AtlasEntityConnector getConnector(String connectorType) { + switch (connectorType.toLowerCase()) { + case "mysql": + return new RdbmsEntityConnector(); + case "hive": + return new HiveEntityConnector(); + default: + LOG.warn("{} type does not have hook implemented on Atlas"); + return null; + } + } +} diff --git a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/HiveEntityConnector.java b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/HiveEntityConnector.java new file mode 100644 index 000000000..5a83b9749 --- /dev/null +++ b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/HiveEntityConnector.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.connector; + +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.trino.client.AtlasClientHelper; +import org.apache.atlas.type.AtlasTypeUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class HiveEntityConnector extends AtlasEntityConnector { + private static final Logger LOG = LoggerFactory.getLogger(HiveEntityConnector.class); + + public static final String HIVE_INSTANCE = "hms_instance"; + public static final String HIVE_DB = "hive_db"; + public static final String HIVE_TABLE = "hive_table"; + public static final String HIVE_COLUMN = "hive_column"; + + @Override + public void connectTrinoCatalog(String instanceName, String catalogName, AtlasEntity entity, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) { + + } + + @Override + public void connectTrinoSchema(String instanceName, String catalogName, String schemaName, AtlasEntity entity, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) { + AtlasEntity hiveDb = null; + try { + hiveDb = toDbEntity(instanceName, schemaName, entity, entityWithExtInfo); + } catch (AtlasServiceException e) { + LOG.error("Error encountered: ", e); + } + + if (hiveDb != null) { + entity.setRelationshipAttribute("hive_db", AtlasTypeUtil.getAtlasRelatedObjectId(hiveDb, "trino_schema_hive_db")); + } + } + + @Override + public void connectTrinoTable(String instanceName, String catalogName, String schemaName, String tableName, AtlasEntity trinoTable, List<AtlasEntity> columnEntities, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) { + AtlasEntity hiveTable; + try { + hiveTable = toTableEntity(instanceName, schemaName, tableName, trinoTable, entityWithExtInfo); + + if (hiveTable != null) { + trinoTable.setRelationshipAttribute("hive_table", AtlasTypeUtil.getAtlasRelatedObjectId(hiveTable, "trino_schema_hive_table")); + + for (AtlasEntity columnEntity : columnEntities) { + connectTrinoColumn(instanceName, catalogName, schemaName, tableName, hiveTable, columnEntity, entityWithExtInfo); + } + } + } catch (AtlasServiceException e) { + LOG.error("Error encountered: ", e); + } + } + + public void connectTrinoColumn(String instanceName, String catalogName, String schemaName, String tableName, AtlasEntity hiveTable, AtlasEntity trinoColumn, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasServiceException { + AtlasEntity hiveColumn; + try { + hiveColumn = toColumnEntity(instanceName, schemaName, tableName, trinoColumn.getAttribute("name").toString(), hiveTable, trinoColumn, entityWithExtInfo); + } catch (AtlasServiceException e) { + throw new AtlasServiceException(e); + } + if (hiveColumn != null) { + trinoColumn.setRelationshipAttribute("hive_column", AtlasTypeUtil.getAtlasRelatedObjectId(hiveColumn, "trino_schema_hive_column")); + } + } + + private AtlasEntity toDbEntity(String instanceName, String schemaName, AtlasEntity trinodschema, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasServiceException { + String dbName = schemaName; + String dbQualifiedName = schemaName + "@" + instanceName; + AtlasEntity.AtlasEntityWithExtInfo ret = AtlasClientHelper.findEntity(HIVE_DB, dbQualifiedName, true, true); + /*if (ret == null || ret.getEntity() == null) { + AtlasEntity hiveDb = new AtlasEntity(HIVE_DB); + + hiveDb.setAttribute(ATTRIBUTE_QUALIFIED_NAME, dbQualifiedName); + hiveDb.setAttribute("name", dbName); + hiveDb.setAttribute("clusterName", instanceName); + List<AtlasEntity> trinoSchemas = new ArrayList<>(); + trinoSchemas.add(trinodschema); + hiveDb.setRelationshipAttribute("trino_schema", AtlasTypeUtil.getAtlasRelatedObjectIds(trinoSchemas, "trino_schema_hive_db")); + entityWithExtInfo.addReferredEntity(hiveDb); + return hiveDb; + }*/ + + return ret != null ? ret.getEntity() : null; + } + + private AtlasEntity toTableEntity(String instanceName, String schemaName, String tableName, AtlasEntity trinoTable, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasServiceException { + String tableQualifiedName = schemaName + "." + tableName + "@" + instanceName; + AtlasEntity.AtlasEntityWithExtInfo ret = AtlasClientHelper.findEntity(HIVE_TABLE, tableQualifiedName, true, true); + /*if (ret == null || ret.getEntity() == null) { + AtlasEntity hiveTable = new AtlasEntity(HIVE_TABLE); + + hiveTable.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tableQualifiedName); + hiveTable.setAttribute("name", tableName); + List<AtlasEntity> trinotabless = new ArrayList<>(); + trinotabless.add(trinoTable); + hiveTable.setRelationshipAttribute("trino_table", AtlasTypeUtil.getAtlasRelatedObjectIds(trinotabless, "trino_schema_hive_table")); + entityWithExtInfo.addReferredEntity(hiveTable); + + return hiveTable; + }*/ + return ret != null ? ret.getEntity() : null; + } + + private AtlasEntity toColumnEntity(String instanceName, String schemaName, String tableName, String columnName, AtlasEntity hiveTable, AtlasEntity trinoColumn, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) throws AtlasServiceException { + String columnQualifiedName = schemaName + "." + tableName + "." + columnName + "@" + instanceName; + AtlasEntity.AtlasEntityWithExtInfo ret = AtlasClientHelper.findEntity(HIVE_COLUMN, columnQualifiedName, true, true); + /*if (ret == null || ret.getEntity() == null) { + AtlasEntity hiveColumn = new AtlasEntity(HIVE_COLUMN); + + hiveColumn.setAttribute(ATTRIBUTE_QUALIFIED_NAME, columnQualifiedName); + hiveColumn.setAttribute("name", columnName); + hiveColumn.setAttribute("type", "temp"); + hiveColumn.setRelationshipAttribute("table", AtlasTypeUtil.getAtlasRelatedObjectId(hiveTable, "hive_table_columns")); + List<AtlasEntity> trinoColumns = new ArrayList<>(); + trinoColumns.add(trinoColumn); + hiveColumn.setRelationshipAttribute("trino_column", AtlasTypeUtil.getAtlasRelatedObjectIds(trinoColumns, "trino_schema_hive_column")); + entityWithExtInfo.addReferredEntity(hiveColumn); + + return hiveColumn; + }*/ + + return ret != null ? ret.getEntity() : null; + } +} diff --git a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/RdbmsEntityConnector.java b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/RdbmsEntityConnector.java new file mode 100644 index 000000000..1f1bb4d77 --- /dev/null +++ b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/connector/RdbmsEntityConnector.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.connector; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class RdbmsEntityConnector extends AtlasEntityConnector { + private static final Logger LOG = LoggerFactory.getLogger(RdbmsEntityConnector.class); + + @Override + public void connectTrinoCatalog(String instanceName, String catalogName, AtlasEntity entity, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) { + + } + + @Override + public void connectTrinoSchema(String instanceName, String catalogName, String schemaName, AtlasEntity entity, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) { + + } + + @Override + public void connectTrinoTable(String instanceName, String catalogName, String schemaName, String tableName, AtlasEntity entity, List<AtlasEntity> columnEntities, AtlasEntity.AtlasEntityWithExtInfo entityWithExtInfo) { + + } +} diff --git a/addons/trino-extractor/src/main/java/org/apache/atlas/trino/model/Catalog.java b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/model/Catalog.java new file mode 100644 index 000000000..5130ccb82 --- /dev/null +++ b/addons/trino-extractor/src/main/java/org/apache/atlas/trino/model/Catalog.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.model; + +import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.trino.connector.AtlasEntityConnector; +import org.apache.atlas.trino.connector.ConnectorFactory; + +public class Catalog { + private String instanceName; + private String name; + private String type; + private String schemaToImport; + private String tableToImport; + private String hookInstanceName; + private boolean hookEnabled; + private AtlasEntityConnector connector; + private AtlasEntity.AtlasEntityWithExtInfo trinoInstanceEntity; + + public Catalog(String name, String type, boolean hookEnabled, String hookInstanceName, String instanceName) { + this.name = name; + this.type = type; + this.hookEnabled = hookEnabled; + this.hookInstanceName = hookInstanceName; + this.instanceName = instanceName; + setConnector(); + } + + public String getName() { + return name; + } + + public String getType() { + return type; + } + + public void setConnector() { + if (hookEnabled) { + connector = ConnectorFactory.getConnector(type); + } + } + + public AtlasEntityConnector getConnector() { + return connector; + } + + public String getHookInstanceName() { + return hookInstanceName; + } + + public String getInstanceName() { + return instanceName; + } + + public AtlasEntity.AtlasEntityWithExtInfo getTrinoInstanceEntity() { + return trinoInstanceEntity; + } + + public void setTrinoInstanceEntity(AtlasEntity.AtlasEntityWithExtInfo trinoInstanceEntity) { + this.trinoInstanceEntity = trinoInstanceEntity; + } + + public String getTableToImport() { + return tableToImport; + } + + public void setTableToImport(String tableToImport) { + this.tableToImport = tableToImport; + } + + public String getSchemaToImport() { + return schemaToImport; + } + + public void setSchemaToImport(String schemaToImport) { + this.schemaToImport = schemaToImport; + } +} diff --git a/addons/trino-extractor/src/test/java/org/apache/atlas/trino/cli/TrinoExtractorIT.java b/addons/trino-extractor/src/test/java/org/apache/atlas/trino/cli/TrinoExtractorIT.java new file mode 100644 index 000000000..4a6ef475f --- /dev/null +++ b/addons/trino-extractor/src/test/java/org/apache/atlas/trino/cli/TrinoExtractorIT.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.trino.cli; + +public class TrinoExtractorIT { + + /* List of testcases + Invalid Arguments + Invalid cron expression + Test valid Catalog to be run + Test Instance creation + Test catalog creation + Test schema creation + Test table creation + Test of hook is enabled, hook entity if created, is connected to Trino entity + Test cron doesn't trigger new job, before earlier thread completes + Test without cron expression + Test even if catalog is not registered, it should run if passed from commandLine + Deleted table + Deleted catalog + Deleted column + Deleted schema + Rename catalog + Rename schema + Tag propogated*/ + +} diff --git a/distro/pom.xml b/distro/pom.xml index af0d01060..d7e958889 100644 --- a/distro/pom.xml +++ b/distro/pom.xml @@ -276,6 +276,7 @@ atlas.graph.storage.hbase.regions-per-server=1 <!--<descriptor>src/main/assemblies/migration-exporter.xml</descriptor>--> <descriptor>src/main/assemblies/classification-updater.xml</descriptor> <descriptor>src/main/assemblies/notification-analyzer.xml</descriptor> + <descriptor>src/main/assemblies/atlas-trino-extractor.xml</descriptor> </descriptors> <finalName>apache-atlas-${project.version}</finalName> <tarLongFileMode>gnu</tarLongFileMode> diff --git a/distro/src/main/assemblies/atlas-trino-extractor.xml b/distro/src/main/assemblies/atlas-trino-extractor.xml new file mode 100644 index 000000000..923c8dc95 --- /dev/null +++ b/distro/src/main/assemblies/atlas-trino-extractor.xml @@ -0,0 +1,65 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<assembly xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <formats> + <format>tar.gz</format> + </formats> + <id>trino-extractor</id> + <baseDirectory>apache-atlas-trino-extractor-${project.version}</baseDirectory> + <fileSets> + <fileSet> + <includes> + <include>README*</include> + </includes> + </fileSet> + <fileSet> + <directory>../addons/trino-extractor/src/main/conf</directory> + <outputDirectory>/conf</outputDirectory> + <includes> + <include>atlas-log4j.xml</include> + <include>atlas-application.properties</include> + </includes> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + <fileSet> + <directory>../addons/trino-extractor/src/main/bin</directory> + <outputDirectory>/bin</outputDirectory> + <includes> + <include>run-trino-extractor.sh</include> + </includes> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + <fileSet> + <directory>../addons/trino-extractor/target/dependency/trino</directory> + <outputDirectory>/lib</outputDirectory> + </fileSet> + <fileSet> + <directory>../addons/trino-extractor/target</directory> + <outputDirectory>/lib</outputDirectory> + <includes> + <include>atlas-trino-extractor-*.jar</include> + </includes> + </fileSet> + </fileSets> + +</assembly> diff --git a/pom.xml b/pom.xml index 5794929e6..073567ee8 100644 --- a/pom.xml +++ b/pom.xml @@ -52,6 +52,7 @@ <module>addons/sqoop-bridge-shim</module> <module>addons/storm-bridge</module> <module>addons/storm-bridge-shim</module> + <module>addons/trino-extractor</module> <module>atlas-examples</module> <module>authorization</module> <module>build-tools</module>