This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch ATLAS-5059 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 200953d11d873b1238ac45e558c6d0b27f25f180 Author: Madhan Neethiraj <mad...@apache.org> AuthorDate: Wed Jul 9 14:50:52 2025 -0700 ATLAS-5059: add support for rdbms backend and audit repository --- .github/workflows/ci.yml | 12 +- addons/kafka-bridge/pom.xml | 5 - dev-support/atlas-docker/.env | 2 + dev-support/atlas-docker/Dockerfile.atlas | 1 + dev-support/atlas-docker/Dockerfile.atlas-db | 2 +- dev-support/atlas-docker/README.md | 15 +- dev-support/atlas-docker/config/init_postgres.sh | 10 + .../atlas-docker/docker-compose.atlas-backend.yml | 10 + .../atlas-docker/docker-compose.atlas-build.yml | 2 - .../atlas-docker/docker-compose.atlas-common.yml | 89 ++++++ .../atlas-docker/docker-compose.atlas-hbase.yml | 4 - .../atlas-docker/docker-compose.atlas-hive.yml | 28 +- .../atlas-docker/docker-compose.atlas-kafka.yml | 27 -- dev-support/atlas-docker/docker-compose.atlas.yml | 42 ++- dev-support/atlas-docker/scripts/atlas.sh | 23 ++ graphdb/janus/pom.xml | 11 + .../graphdb/janus/AtlasJanusGraphDatabase.java | 27 ++ graphdb/janusgraph-rdbms/pom.xml | 82 +++++ .../diskstorage/rdbms/JanusColumnValue.java | 51 ++++ .../janusgraph/diskstorage/rdbms/RdbmsStore.java | 329 +++++++++++++++++++++ .../diskstorage/rdbms/RdbmsStoreManager.java | 176 +++++++++++ .../diskstorage/rdbms/RdbmsTransaction.java | 109 +++++++ .../janusgraph/diskstorage/rdbms/dao/BaseDao.java | 53 ++++ .../diskstorage/rdbms/dao/DaoManager.java | 103 +++++++ .../diskstorage/rdbms/dao/JanusColumnDao.java | 308 +++++++++++++++++++ .../diskstorage/rdbms/dao/JanusKeyDao.java | 49 +++ .../diskstorage/rdbms/dao/JanusStoreDao.java | 48 +++ .../diskstorage/rdbms/entity/JanusColumn.java | 131 ++++++++ .../diskstorage/rdbms/entity/JanusKey.java | 117 ++++++++ .../diskstorage/rdbms/entity/JanusStore.java | 99 +++++++ .../resources/META-INF/janus-jpa_named_queries.xml | 71 +++++ .../main/resources/META-INF/janus-persistence.xml | 32 ++ .../resources/META-INF/postgres/create_schema.sql | 44 +++ .../META-INF/postgres/create_sequences.sql | 18 ++ graphdb/pom.xml | 2 +- pom.xml | 3 + repository/pom.xml | 18 ++ .../audit/AbstractStorageBasedAuditRepository.java | 36 ++- .../audit/rdbms/RdbmsBasedAuditRepository.java | 171 +++++++++++ .../repository/audit/rdbms/RdbmsTransaction.java | 53 ++++ .../atlas/repository/audit/rdbms/dao/BaseDao.java | 35 +++ .../repository/audit/rdbms/dao/DaoManager.java | 88 ++++++ .../audit/rdbms/dao/DbEntityAuditDao.java | 72 +++++ .../audit/rdbms/entity/DbEntityAudit.java | 180 +++++++++++ .../resources/META-INF/atlas-jpa_named_queries.xml | 37 +++ .../main/resources/META-INF/atlas-persistence.xml | 30 ++ .../resources/META-INF/postgres/create_schema.sql | 22 ++ .../META-INF/postgres/create_sequences.sql | 16 + 48 files changed, 2803 insertions(+), 90 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 136d1a54d..1e9bed392 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -69,7 +69,8 @@ jobs: cd dev-support/atlas-docker export DOCKER_BUILDKIT=1 export COMPOSE_DOCKER_CLI_BUILD=1 - SKIPTESTS=false docker compose -f docker-compose.atlas-base.yml -f docker-compose.atlas-build.yml up + docker compose -f docker-compose.atlas-base.yml build + SKIPTESTS=false docker compose -f docker-compose.atlas-build.yml up ATLAS_BUILD_CONTAINER=$(docker ps -a -q --filter "name=atlas-build") EXIT_CODE=$(docker inspect --format '{{.State.ExitCode}}' "$ATLAS_BUILD_CONTAINER") @@ -105,16 +106,11 @@ jobs: export DOCKER_BUILDKIT=1 export COMPOSE_DOCKER_CLI_BUILD=1 docker compose -f docker-compose.atlas-base.yml build - docker compose \ - -f docker-compose.atlas.yml \ - -f docker-compose.atlas-hadoop.yml \ - -f docker-compose.atlas-hbase.yml \ - -f docker-compose.atlas-kafka.yml \ - -f docker-compose.atlas-hive.yml up -d --wait + docker compose -f docker-compose.atlas.yml -f docker-compose.atlas-hadoop.yml -f docker-compose.atlas-hive.yml up -d --wait - name: Check status of containers and remove them run: | - containers=(atlas atlas-hadoop atlas-hbase atlas-kafka atlas-hive); + containers=(atlas-zk atlas-solr atlas-kafka atlas-db atlas-hadoop atlas-hbase atlas-hive atlas); flag=true; for container in "${containers[@]}"; do if [[ $(docker inspect -f '{{.State.Running}}' $container 2>/dev/null) == "true" ]]; then diff --git a/addons/kafka-bridge/pom.xml b/addons/kafka-bridge/pom.xml index a1dd3a666..496b5eb86 100644 --- a/addons/kafka-bridge/pom.xml +++ b/addons/kafka-bridge/pom.xml @@ -337,12 +337,7 @@ </resources> </configuration> </execution> - </executions> - </plugin> - <plugin> - <artifactId>maven-resources-plugin</artifactId> - <executions> <execution> <id>copy-solr-resources</id> <goals> diff --git a/dev-support/atlas-docker/.env b/dev-support/atlas-docker/.env index 8e608f795..a4f208f62 100644 --- a/dev-support/atlas-docker/.env +++ b/dev-support/atlas-docker/.env @@ -24,3 +24,5 @@ HBASE_VERSION=2.5.0 KAFKA_VERSION=2.8.2 HIVE_VERSION=3.1.3 HIVE_HADOOP_VERSION=3.1.1 + +ATLAS_BACKEND=hbase diff --git a/dev-support/atlas-docker/Dockerfile.atlas b/dev-support/atlas-docker/Dockerfile.atlas index 95a34168b..b46b8eb2d 100644 --- a/dev-support/atlas-docker/Dockerfile.atlas +++ b/dev-support/atlas-docker/Dockerfile.atlas @@ -16,6 +16,7 @@ FROM atlas-base:latest +ARG ATLAS_BACKEND ARG ATLAS_SERVER_JAVA_VERSION ARG ATLAS_VERSION ARG TARGETARCH diff --git a/dev-support/atlas-docker/Dockerfile.atlas-db b/dev-support/atlas-docker/Dockerfile.atlas-db index 95e67675b..8fa5cdd48 100644 --- a/dev-support/atlas-docker/Dockerfile.atlas-db +++ b/dev-support/atlas-docker/Dockerfile.atlas-db @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -FROM postgres:12 +FROM postgres:13.21 # Copy DB init script USER 0 diff --git a/dev-support/atlas-docker/README.md b/dev-support/atlas-docker/README.md index e557c23bc..fff299975 100644 --- a/dev-support/atlas-docker/README.md +++ b/dev-support/atlas-docker/README.md @@ -44,15 +44,22 @@ Docker files in this folder create docker images and run them to build Apache At 6. Build and deploy Apache Atlas in containers using docker-compose - 6.1. Ensure that the `${HOME}/.m2` directory exists and Execute following command to build Apache Atlas: + 6.1. Build atlas-base image with the following commenr + docker-compose -f docker-compose.atlas-base.yml build + + 6.2. Ensure that the `${HOME}/.m2` directory exists and Execute following command to build Apache Atlas: mkdir -p ${HOME}/.m2 - docker-compose -f docker-compose.atlas-base.yml -f docker-compose.atlas-build.yml up + docker-compose docker-compose.atlas-build.yml up Time taken to complete the build might vary (upto an hour), depending on status of ${HOME}/.m2 directory cache. - 6.2. Execute following command to install and start Atlas and dependent services (Solr, HBase, Kafka) in containers: + 6.3. To install and start Atlas using Postgres as backend store, execute following commands: + export ATLAS_BACKEND=postgres + docker-compose -f docker-compose.atlas.yml up -d --wait - docker-compose -f docker-compose.atlas-base.yml -f docker-compose.atlas.yml -f docker-compose.atlas-hadoop.yml -f docker-compose.atlas-hbase.yml -f docker-compose.atlas-kafka.yml -f docker-compose.atlas-hive.yml up -d + 6.4. To install and start Atlas using HBase as backend store, execute following commands: + export ATLAS_BACKEND=hbase + docker-compose -f docker-compose.atlas.yml -f docker-compose.atlas-hadoop.yml up -d --wait Apache Atlas will be installed at /opt/atlas/, and logs are at /var/logs/atlas directory. diff --git a/dev-support/atlas-docker/config/init_postgres.sh b/dev-support/atlas-docker/config/init_postgres.sh index 06028789f..bb7b9c665 100644 --- a/dev-support/atlas-docker/config/init_postgres.sh +++ b/dev-support/atlas-docker/config/init_postgres.sh @@ -23,4 +23,14 @@ psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-E CREATE USER hive WITH PASSWORD 'atlasR0cks!'; CREATE DATABASE hive; GRANT ALL PRIVILEGES ON DATABASE hive TO hive; + + CREATE USER atlas WITH PASSWORD 'atlasR0cks!'; + CREATE DATABASE atlas; + GRANT ALL PRIVILEGES ON DATABASE atlas TO atlas; + + \c hive + GRANT ALL ON SCHEMA public TO public; + + \c atlas + GRANT ALL ON SCHEMA public TO public; EOSQL diff --git a/dev-support/atlas-docker/docker-compose.atlas-backend.yml b/dev-support/atlas-docker/docker-compose.atlas-backend.yml new file mode 100644 index 000000000..04072d771 --- /dev/null +++ b/dev-support/atlas-docker/docker-compose.atlas-backend.yml @@ -0,0 +1,10 @@ +services: + hbase: + extends: + file: docker-compose.atlas-hbase.yml + service: atlas-hbase + + postgres: + extends: + file: docker-compose.atlas-common.yml + service: atlas-db diff --git a/dev-support/atlas-docker/docker-compose.atlas-build.yml b/dev-support/atlas-docker/docker-compose.atlas-build.yml index 468c1b629..8d41c5212 100644 --- a/dev-support/atlas-docker/docker-compose.atlas-build.yml +++ b/dev-support/atlas-docker/docker-compose.atlas-build.yml @@ -18,8 +18,6 @@ services: - ./patches:/home/atlas/patches - ./dist:/home/atlas/dist - ./../..:/home/atlas/src:delegated - depends_on: - - atlas-base environment: - ATLAS_VERSION - BRANCH diff --git a/dev-support/atlas-docker/docker-compose.atlas-common.yml b/dev-support/atlas-docker/docker-compose.atlas-common.yml new file mode 100644 index 000000000..5b98fcba4 --- /dev/null +++ b/dev-support/atlas-docker/docker-compose.atlas-common.yml @@ -0,0 +1,89 @@ +services: + atlas-hadoop: + build: + context: . + dockerfile: Dockerfile.atlas-hadoop + args: + - HADOOP_VERSION=${HADOOP_VERSION} + image: atlas-hadoop + container_name: atlas-hadoop + hostname: atlas-hadoop.example.com + stdin_open: true + tty: true + networks: + - atlas + ports: + - "9000:9000" + - "8088:8088" + healthcheck: + test: [ "CMD-SHELL", "su hdfs -c \"/opt/hadoop/bin/hdfs dfsadmin -report | grep -q 'Live datanodes'\"" ] + interval: 30s + timeout: 10s + retries: 30 + start_period: 40s + environment: + - HADOOP_VERSION + + atlas-solr: + build: + context: . + dockerfile: Dockerfile.atlas-solr + image: atlas-solr + container_name: atlas-solr + hostname: atlas-solr.example.com + networks: + - atlas + ports: + - "8983:8983" + + atlas-kafka: + build: + context: . + dockerfile: Dockerfile.atlas-kafka + args: + - KAFKA_VERSION=${KAFKA_VERSION} + - ATLAS_VERSION=${ATLAS_VERSION} + image: atlas-kafka + container_name: atlas-kafka + hostname: atlas-kafka.example.com + stdin_open: true + tty: true + networks: + - atlas + ports: + - "9092:9092" + depends_on: + atlas-zk: + condition: service_started + environment: + - KAFKA_VERSION + - ATLAS_VERSION + + atlas-zk: + build: + context: . + dockerfile: Dockerfile.atlas-zk + image: atlas-zk + container_name: atlas-zk + hostname: atlas-zk.example.com + networks: + - atlas + ports: + - "2181:2181" + + atlas-db: + build: + context: . + dockerfile: Dockerfile.atlas-db + image: atlas-db + container_name: atlas-db + hostname: atlas-db.example.com + networks: + - atlas + healthcheck: + test: 'su -c "pg_isready -q" postgres' + interval: 30s + timeout: 10s + retries: 30 + start_period: 40s + diff --git a/dev-support/atlas-docker/docker-compose.atlas-hbase.yml b/dev-support/atlas-docker/docker-compose.atlas-hbase.yml index 0e2d5f9da..94747c845 100644 --- a/dev-support/atlas-docker/docker-compose.atlas-hbase.yml +++ b/dev-support/atlas-docker/docker-compose.atlas-hbase.yml @@ -34,7 +34,3 @@ services: environment: - HBASE_VERSION - ATLAS_VERSION - -networks: - atlas: - name: atlasnw diff --git a/dev-support/atlas-docker/docker-compose.atlas-hive.yml b/dev-support/atlas-docker/docker-compose.atlas-hive.yml index 9934a9d12..7e9bcc7df 100644 --- a/dev-support/atlas-docker/docker-compose.atlas-hive.yml +++ b/dev-support/atlas-docker/docker-compose.atlas-hive.yml @@ -24,7 +24,7 @@ services: - "10000:10000" depends_on: atlas-db: - condition: service_started + condition: service_healthy atlas-hadoop: condition: service_healthy atlas-zk: @@ -37,19 +37,19 @@ services: - ATLAS_VERSION atlas-db: - build: - context: . - dockerfile: Dockerfile.atlas-db - image: atlas-db - container_name: atlas-db - hostname: atlas-db.example.com - networks: - - atlas - healthcheck: - test: 'su -c "pg_isready -q" postgres' - interval: 10s - timeout: 2s - retries: 30 + extends: + service: atlas-db + file: docker-compose.atlas-common.yml + + atlas-zk: + extends: + service: atlas-zk + file: docker-compose.atlas-common.yml + + atlas-kafka: + extends: + service: atlas-kafka + file: docker-compose.atlas-common.yml networks: atlas: diff --git a/dev-support/atlas-docker/docker-compose.atlas-kafka.yml b/dev-support/atlas-docker/docker-compose.atlas-kafka.yml deleted file mode 100644 index 55e09a0c3..000000000 --- a/dev-support/atlas-docker/docker-compose.atlas-kafka.yml +++ /dev/null @@ -1,27 +0,0 @@ -services: - atlas-kafka: - build: - context: . - dockerfile: Dockerfile.atlas-kafka - args: - - KAFKA_VERSION=${KAFKA_VERSION} - - ATLAS_VERSION=${ATLAS_VERSION} - image: atlas-kafka - container_name: atlas-kafka - hostname: atlas-kafka.example.com - stdin_open: true - tty: true - networks: - - atlas - ports: - - "9092:9092" - depends_on: - atlas-zk: - condition: service_started - environment: - - KAFKA_VERSION - - ATLAS_VERSION - -networks: - atlas: - name: atlasnw diff --git a/dev-support/atlas-docker/docker-compose.atlas.yml b/dev-support/atlas-docker/docker-compose.atlas.yml index 3b4d7d58e..54bb7c164 100644 --- a/dev-support/atlas-docker/docker-compose.atlas.yml +++ b/dev-support/atlas-docker/docker-compose.atlas.yml @@ -4,6 +4,7 @@ services: context: . dockerfile: Dockerfile.atlas args: + - ATLAS_BACKEND=${ATLAS_BACKEND} - ATLAS_SERVER_JAVA_VERSION=${ATLAS_SERVER_JAVA_VERSION} - ATLAS_VERSION=${ATLAS_VERSION} image: atlas:latest @@ -18,7 +19,7 @@ services: ports: - "21000:21000" depends_on: - atlas-hbase: + atlas-backend: condition: service_healthy atlas-kafka: condition: service_started @@ -27,34 +28,31 @@ services: atlas-zk: condition: service_started environment: + - ATLAS_BACKEND - ATLAS_SERVER_JAVA_VERSION - ATLAS_VERSION command: - /home/atlas/scripts/atlas.sh - atlas-zk: - build: - context: . - dockerfile: Dockerfile.atlas-zk - image: atlas-zk - container_name: atlas-zk - hostname: atlas-zk.example.com - networks: - - atlas - ports: - - "2181:2181" + atlas-backend: + extends: + service: ${ATLAS_BACKEND} + file: docker-compose.atlas-backend.yml + + atlas-kafka: + extends: + service: atlas-kafka + file: docker-compose.atlas-common.yml atlas-solr: - build: - context: . - dockerfile: Dockerfile.atlas-solr - image: atlas-solr - container_name: atlas-solr - hostname: atlas-solr.example.com - networks: - - atlas - ports: - - "8983:8983" + extends: + service: atlas-solr + file: docker-compose.atlas-common.yml + + atlas-zk: + extends: + service: atlas-zk + file: docker-compose.atlas-common.yml networks: atlas: diff --git a/dev-support/atlas-docker/scripts/atlas.sh b/dev-support/atlas-docker/scripts/atlas.sh index 965bd7137..714ca1f0f 100755 --- a/dev-support/atlas-docker/scripts/atlas.sh +++ b/dev-support/atlas-docker/scripts/atlas.sh @@ -44,6 +44,29 @@ then echo "" >> /opt/atlas/conf/atlas-application.properties echo "atlas.graph.storage.hbase.compression-algorithm=NONE" >> /opt/atlas/conf/atlas-application.properties + echo "atlas.graph.graph.replace-instance-if-exists=true" >> /opt/atlas/conf/atlas-application.properties + + if [ "${ATLAS_BACKEND}" == "postgres" ] + then + # set RDBMS as backend and entity-audit store + sed -i "s/^atlas.graph.storage.backend=hbase2/# atlas.graph.storage.backend=hbase2/" /opt/atlas/conf/atlas-application.properties + sed -i "s/atlas.EntityAuditRepository.impl=.*$/# atlas.EntityAuditRepository.impl=org.apache.atlas.repository.audit.HBaseBasedAuditRepository/" /opt/atlas/conf/atlas-application.properties + + cat <<EOF >> /opt/atlas/conf/atlas-application.properties + +atlas.graph.storage.backend=rdbms +atlas.graph.storage.rdbms.jpa.javax.persistence.jdbc.dialect=org.eclipse.persistence.platform.database.PostgreSQLPlatform +atlas.graph.storage.rdbms.jpa.javax.persistence.jdbc.driver=org.postgresql.Driver +atlas.graph.storage.rdbms.jpa.javax.persistence.jdbc.url=jdbc:postgresql://atlas-db/atlas +atlas.graph.storage.rdbms.jpa.javax.persistence.jdbc.user=atlas +atlas.graph.storage.rdbms.jpa.javax.persistence.jdbc.password=atlasR0cks! +atlas.graph.storage.rdbms.jpa.javax.persistence.schema-generation.database.action=create +atlas.graph.storage.rdbms.jpa.javax.persistence.schema-generation.create-database-schemas=true +atlas.graph.storage.rdbms.jpa.javax.persistence.schema-generation.create-source=script-then-metadata +atlas.graph.storage.rdbms.jpa.javax.persistence.schema-generation.create-script-source=META-INF/postgres/create_sequences.sql +atlas.EntityAuditRepository.impl=org.apache.atlas.repository.audit.rdbms.RdbmsBasedAuditRepository +EOF + fi chown -R atlas:atlas ${ATLAS_HOME}/ diff --git a/graphdb/janus/pom.xml b/graphdb/janus/pom.xml index 4cd9b2b24..536036381 100644 --- a/graphdb/janus/pom.xml +++ b/graphdb/janus/pom.xml @@ -51,6 +51,17 @@ <artifactId>atlas-graphdb-common</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>janusgraph-rdbms</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> <dependency> <groupId>org.apache.commons</groupId> diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java index 77428ba99..26ba75a30 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java @@ -40,6 +40,7 @@ import org.janusgraph.diskstorage.StandardIndexProvider; import org.janusgraph.diskstorage.StandardStoreManager; import org.janusgraph.diskstorage.es.ElasticSearch7Index; import org.janusgraph.diskstorage.hbase.HBaseStoreManager; +import org.janusgraph.diskstorage.rdbms.RdbmsStoreManager; import org.janusgraph.diskstorage.solr.Solr6Index; import org.janusgraph.graphdb.database.serialize.attribute.SerializableSerializer; import org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry; @@ -329,6 +330,31 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex, } } + private static void addRdbmsSupport() { + try { + Field field = StandardStoreManager.class.getDeclaredField("ALL_MANAGER_CLASSES"); + + field.setAccessible(true); + + Field modifiersField = Field.class.getDeclaredField("modifiers"); + + modifiersField.setAccessible(true); + modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); + + Map<String, String> customMap = new HashMap<>(StandardStoreManager.getAllManagerClasses()); + + customMap.put("rdbms", RdbmsStoreManager.class.getName()); + + ImmutableMap<String, String> immap = ImmutableMap.copyOf(customMap); + + field.set(null, immap); + + LOG.debug("Injected RDBMS support - {}", RdbmsStoreManager.class.getName()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + private static void addSolr6Index() { try { Field field = StandardIndexProvider.class.getDeclaredField("ALL_MANAGER_CLASSES"); @@ -447,6 +473,7 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex, static { addHBase2Support(); + addRdbmsSupport(); addSolr6Index(); diff --git a/graphdb/janusgraph-rdbms/pom.xml b/graphdb/janusgraph-rdbms/pom.xml new file mode 100644 index 000000000..c4dc9ef89 --- /dev/null +++ b/graphdb/janusgraph-rdbms/pom.xml @@ -0,0 +1,82 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +~ Licensed to the Apache Software Foundation (ASF) under one +~ or more contributor license agreements. See the NOTICE file +~ distributed with this work for additional information +~ regarding copyright ownership. The ASF licenses this file +~ to you under the Apache License, Version 2.0 (the +~ "License"); you may not use this file except in compliance +~ with the License. You may obtain a copy of the License at +~ +~ http://www.apache.org/licenses/LICENSE-2.0 +~ +~ Unless required by applicable law or agreed to in writing, software +~ distributed under the License is distributed on an "AS IS" BASIS, +~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +~ See the License for the specific language governing permissions and +~ limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-graphdb</artifactId> + <version>3.0.0-SNAPSHOT</version> + </parent> + + <artifactId>janusgraph-rdbms</artifactId> + <name>JanusGraph RDBMS backend store</name> + + <dependencies> + <dependency> + <groupId>org.eclipse.persistence</groupId> + <artifactId>eclipselink</artifactId> + <version>${eclipse.jpa.version}</version> + </dependency> + + <dependency> + <groupId>org.eclipse.persistence</groupId> + <artifactId>javax.persistence</artifactId> + <version>${javax.persistence.version}</version> + </dependency> + + <dependency> + <groupId>org.janusgraph</groupId> + <artifactId>janusgraph-core</artifactId> + <version>${janusgraph.version}</version> + <exclusions> + <exclusion> + <groupId>com.rabbitmq</groupId> + <artifactId>amqp-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.noggit</groupId> + <artifactId>noggit</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <build> + <resources> + <resource> + <filtering>true</filtering> + <directory>${basedir}/src/main/resources</directory> + </resource> + </resources> + <testResources> + <testResource> + <directory>${basedir}/src/test/resources</directory> + </testResource> + </testResources> + </build> + + <repositories> + <repository> + <id>postgres</id> + <name>Postgres Driver</name> + <url>https://mvnrepository.com/artifact/org.postgresql/postgresql</url> + </repository> + </repositories> +</project> diff --git a/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/JanusColumnValue.java b/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/JanusColumnValue.java new file mode 100644 index 000000000..bd9b0f823 --- /dev/null +++ b/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/JanusColumnValue.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.janusgraph.diskstorage.rdbms; + +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.util.StaticArrayBuffer; + +/** + * ColumnValue stored in RDBMS + * + */ +public class JanusColumnValue { + private final byte[] column; + private final byte[] value; + + public JanusColumnValue(byte[] column, byte[] value) { + this.column = column; + this.value = value; + } + + public byte[] getColumn() { + return column; + } + + public byte[] getValue() { + return value; + } + + public StaticBuffer getColumnAsStaticBuffer() { + return StaticArrayBuffer.of(column); + } + + public StaticBuffer getValueAsStaticBuffer() { + return StaticArrayBuffer.of(value); + } +} diff --git a/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/RdbmsStore.java b/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/RdbmsStore.java new file mode 100644 index 000000000..7117fb1b1 --- /dev/null +++ b/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/RdbmsStore.java @@ -0,0 +1,329 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.janusgraph.diskstorage.rdbms; + +import org.janusgraph.diskstorage.BackendException; +import org.janusgraph.diskstorage.Entry; +import org.janusgraph.diskstorage.EntryList; +import org.janusgraph.diskstorage.EntryMetaData; +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStore; +import org.janusgraph.diskstorage.keycolumnvalue.KeyIterator; +import org.janusgraph.diskstorage.keycolumnvalue.KeyRangeQuery; +import org.janusgraph.diskstorage.keycolumnvalue.KeySliceQuery; +import org.janusgraph.diskstorage.keycolumnvalue.KeySlicesIterator; +import org.janusgraph.diskstorage.keycolumnvalue.MultiSlicesQuery; +import org.janusgraph.diskstorage.keycolumnvalue.SliceQuery; +import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; +import org.janusgraph.diskstorage.rdbms.dao.DaoManager; +import org.janusgraph.diskstorage.rdbms.dao.JanusColumnDao; +import org.janusgraph.diskstorage.rdbms.dao.JanusKeyDao; +import org.janusgraph.diskstorage.rdbms.dao.JanusStoreDao; +import org.janusgraph.diskstorage.rdbms.entity.JanusKey; +import org.janusgraph.diskstorage.rdbms.entity.JanusStore; +import org.janusgraph.diskstorage.util.StaticArrayEntry; +import org.janusgraph.diskstorage.util.StaticArrayEntryList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +/** + * KeyColumnValue store backed by RDBMS + * + */ +public class RdbmsStore implements KeyColumnValueStore { + private static final Logger LOG = LoggerFactory.getLogger(RdbmsStore.class); + + private static final int STORE_CREATE_MAX_ATTEMPTS = 10; + private static final int STORE_CREATE_RETRY_DELAY_MS = 100; + private static final int KEY_CREATE_MAX_ATTEMPTS = 10; + private static final int KEY_CREATE_RETRY_DELAY_MS = 100; + + private final String name; + private final DaoManager daoManager; + private final EntryMetaData[] entryMetaData; + private Long storeId; + + public RdbmsStore(String name, RdbmsStoreManager storeManager) { + LOG.info("RdbmsStore(name={})", name); + + this.name = name; + this.daoManager = storeManager.getDaoManager(); + this.entryMetaData = storeManager.getMetaDataSchema(name); + this.storeId = null; + } + + @Override + public EntryList getSlice(KeySliceQuery query, StoreTransaction trx) { + LOG.debug("==> RdbmsStore.getSlice(name={}, query={}, trx={})", name, query, trx); + + final EntryList ret; + + if (isStorePresent(trx)) { + JanusColumnDao dao = new JanusColumnDao((RdbmsTransaction) trx, this); + Long keyId = getKeyIdOrCreate(toBytes(query.getKey()), trx); + byte[] sliceStart = toBytes(query.getSliceStart()); + byte[] sliceEnd = toBytes(query.getSliceEnd()); + List<JanusColumnValue> entries = dao.getColumns(keyId, sliceStart, sliceEnd, query.getLimit()); + + if (entries != null && !entries.isEmpty()) { + ret = StaticArrayEntryList.ofStaticBuffer(entries, toEntry); + } else { + ret = EntryList.EMPTY_LIST; + } + } else { + ret = EntryList.EMPTY_LIST; + } + + LOG.debug("<== RdbmsStore.getSlice(name={}, query={}, trx={}): ret={}", name, query, trx, ret.size()); + + return ret; + } + + @Override + public Map<StaticBuffer, EntryList> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction trx) { + LOG.debug("==> RdbmsStore.getSlice(name={}, len(keys)={}, query={}, trx={})", name, keys.size(), query, trx); + + final Map<StaticBuffer, EntryList> ret; + + if (isStorePresent(trx)) { + ret = new TreeMap<>(); + + for (StaticBuffer key : keys) { + ret.put(key, getSlice(new KeySliceQuery(key, query), trx)); + } + } else { + ret = Collections.emptyMap(); + } + + LOG.debug("<== RdbmsStore.getSlice(name={}, len(keys)={}, query={}, trx={}): ret={}", name, keys.size(), query, trx, ret); + + return ret; + } + + @Override + public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction trx) { + LOG.debug("==> RdbmsStore.mutate(name={}, key={}, additions={}, deletions={}, trx={})", name, key, additions, deletions, trx); + + byte[] keyName = toBytes(key); + long keyId = getKeyIdOrCreate(keyName, trx); + JanusColumnDao columnDao = new JanusColumnDao((RdbmsTransaction) trx, this); + + for (StaticBuffer column : deletions) { + byte[] columnName = toBytes(column); + + columnDao.remove(keyId, columnName); + } + + for (Entry entry : additions) { + columnDao.addOrUpdate(keyId, toBytes(entry.getColumn()), toBytes(entry.getValue())); + } + + LOG.debug("<== RdbmsStore.mutate(name={}, key={}, additions={}, deletions={}, trx={})", name, key, additions, deletions, trx); + } + + @Override + public void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue, StoreTransaction trx) { + LOG.debug("RdbmsStore.acquireLock(key={}, column={}, expectedValue={}, trx={}): UnsupportedOperation", key, column, expectedValue, trx); + + throw new UnsupportedOperationException(); + } + + @Override + public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction trx) { + LOG.debug("==> RdbmsStore.getKeys(name={}, query={}, trx={})", name, query, trx); + + final KeyIterator ret; + + if (isStorePresent(trx)) { + JanusColumnDao dao = new JanusColumnDao((RdbmsTransaction) trx, this); + + ret = dao.getKeysByKeyAndColumnRange(this.storeId, toBytes(query.getKeyStart()), toBytes(query.getKeyEnd()), toBytes(query.getSliceStart()), toBytes(query.getSliceEnd()), query.getLimit()); + } else { + ret = JanusColumnDao.EMPTY_KEY_ITERATOR; + } + + LOG.debug("<== RdbmsStore.debug(name={}, query={}, trx={}): ret={}", name, query, trx, ret); + + return ret; + } + + @Override + public KeyIterator getKeys(SliceQuery query, StoreTransaction trx) { + LOG.debug("==> RdbmsStore.getKeys(name={}, query={}, trx={})", name, query, trx); + + final KeyIterator ret; + + if (isStorePresent(trx)) { + JanusColumnDao dao = new JanusColumnDao((RdbmsTransaction) trx, this); + + ret = dao.getKeysByColumnRange(this.storeId, toBytes(query.getSliceStart()), toBytes(query.getSliceEnd()), query.getLimit()); + } else { + ret = JanusColumnDao.EMPTY_KEY_ITERATOR; + } + + LOG.debug("<== RdbmsStore.debug(name={}, query={}, trx={}): ret={}", name, query, trx, ret); + + return ret; + } + + @Override + public KeySlicesIterator getKeys(MultiSlicesQuery query, StoreTransaction trx) { + LOG.debug("RdbmsStore.getKeys(query={}, trx={}): UnsupportedOperation", query, trx); + + throw new UnsupportedOperationException(); + } + + @Override + public String getName() { + return name; + } + + @Override + public void close() throws BackendException { + LOG.debug("RdbmsStore.close(name={})", name); + } + + private boolean isStorePresent(StoreTransaction trx) { + Long storeId = this.storeId; + + if (storeId == null) { + JanusStoreDao storeDao = new JanusStoreDao((RdbmsTransaction) trx); + + storeId = storeDao.getIdByName(name); + + if (storeId != null) { + this.storeId = storeId; + } + } + + return storeId != null; + } + + private static byte[] toBytes(StaticBuffer val) { + return val == null ? null : val.as(StaticBuffer.ARRAY_FACTORY); + } + + private Long getStoreIdOrCreate(StoreTransaction trx) { + Long ret = this.storeId; + + if (ret == null) { + JanusStoreDao dao = new JanusStoreDao((RdbmsTransaction) trx); + + ret = dao.getIdByName(name); + + for (int attempt = 1; ret == null; attempt++) { + try (RdbmsTransaction trx2 = new RdbmsTransaction(trx.getConfiguration(), daoManager)) { + JanusStoreDao dao2 = new JanusStoreDao(trx2); + JanusStore store = dao2.create(new JanusStore(name)); + + trx2.commit(); + + ret = store != null ? store.getId() : null; + + LOG.debug("attempt #{}: created store(name={}): id={}", attempt, name, ret); + } catch (IOException excp) { + LOG.error("attempt #{}: failed to create store(name={})", attempt, name, excp); + } + + if (ret != null || attempt >= STORE_CREATE_MAX_ATTEMPTS) { + break; + } + + try { + Thread.sleep(STORE_CREATE_RETRY_DELAY_MS); + } catch (InterruptedException excp) { + LOG.error("Thread interrupted while waiting to retry store creation(name={})", name, excp); + Thread.currentThread().interrupt(); + } + } + + if (ret != null) { + this.storeId = ret; + } else { + LOG.error("Failed to create store(name={}) after {} attempts", name, STORE_CREATE_MAX_ATTEMPTS); + } + } + + return ret; + } + + private Long getKeyIdOrCreate(byte[] key, StoreTransaction trx) { + Long storeId = getStoreIdOrCreate(trx); + JanusKeyDao dao = new JanusKeyDao((RdbmsTransaction) trx); + Long ret = dao.getIdByStoreIdAndName(storeId, key); + + for (int attempt = 1; ret == null; attempt++) { + try (RdbmsTransaction trx2 = new RdbmsTransaction(trx.getConfiguration(), daoManager)) { + JanusKeyDao dao2 = new JanusKeyDao(trx2); + JanusKey createdKey = dao2.create(new JanusKey(storeId, key)); + + trx2.commit(); + + ret = createdKey != null ? createdKey.getId() : null; + + LOG.debug("attempt #{}: created key(storeId={}, key={}): id={}", attempt, storeId, key, ret); + } catch (IOException excp) { + LOG.error("attempt #{}: failed to create key(storeId={}, key={})", attempt, storeId, key, excp); + } + + if (ret != null || attempt >= KEY_CREATE_MAX_ATTEMPTS) { + break; + } + + try { + Thread.sleep(KEY_CREATE_RETRY_DELAY_MS); + } catch (InterruptedException excp) { + LOG.error("Thread interrupted while waiting to retry key creation(storeId={}, key={})", storeId, key, excp); + Thread.currentThread().interrupt(); + } + } + + return ret; + } + + public final StaticArrayEntry.GetColVal<JanusColumnValue, StaticBuffer> toEntry = + new StaticArrayEntry.GetColVal<JanusColumnValue, StaticBuffer>() { + @Override + public StaticBuffer getColumn(JanusColumnValue columnValue) { + return columnValue.getColumnAsStaticBuffer(); + } + + @Override + public StaticBuffer getValue(JanusColumnValue columnValue) { + return columnValue.getValueAsStaticBuffer(); + } + + @Override + public EntryMetaData[] getMetaSchema(JanusColumnValue janusColumnValue) { + return entryMetaData; + } + + @Override + public Object getMetaData(JanusColumnValue janusColumnValue, EntryMetaData entryMetaData) { + LOG.debug("RdbmsStore.getMetaData(janusColumnValue={}, entryMetaData={}): UnsupportedOperation", janusColumnValue, entryMetaData); + + return new UnsupportedOperationException(); + } + }; +} diff --git a/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/RdbmsStoreManager.java b/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/RdbmsStoreManager.java new file mode 100644 index 000000000..d04f0d8a8 --- /dev/null +++ b/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/RdbmsStoreManager.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.janusgraph.diskstorage.rdbms; + +import org.janusgraph.diskstorage.BackendException; +import org.janusgraph.diskstorage.BaseTransactionConfig; +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.StoreMetaData; +import org.janusgraph.diskstorage.common.AbstractStoreManager; +import org.janusgraph.diskstorage.configuration.ConfigNamespace; +import org.janusgraph.diskstorage.configuration.Configuration; +import org.janusgraph.diskstorage.keycolumnvalue.KCVMutation; +import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStore; +import org.janusgraph.diskstorage.keycolumnvalue.KeyColumnValueStoreManager; +import org.janusgraph.diskstorage.keycolumnvalue.KeyRange; +import org.janusgraph.diskstorage.keycolumnvalue.StandardStoreFeatures; +import org.janusgraph.diskstorage.keycolumnvalue.StoreFeatures; +import org.janusgraph.diskstorage.keycolumnvalue.StoreTransaction; +import org.janusgraph.diskstorage.rdbms.dao.DaoManager; +import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration; +import org.janusgraph.graphdb.configuration.PreInitializeConfigOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Storage Manager for RDBMS + * + */ +@PreInitializeConfigOptions +public class RdbmsStoreManager extends AbstractStoreManager implements KeyColumnValueStoreManager { + private static final Logger LOG = LoggerFactory.getLogger(RdbmsStoreManager.class); + + private static final String NAME = "rdbms"; + + public static final ConfigNamespace RDBMS_NS = new ConfigNamespace(GraphDatabaseConfiguration.STORAGE_NS, NAME, "RDBMS configuration options"); + public static final ConfigNamespace JPA_CONFIG_NS = new ConfigNamespace(RDBMS_NS, "jpa", "JPA configurations", true); + + private final StandardStoreFeatures features; + private final Map<String, RdbmsStore> stores; + private final DaoManager daoManager; + + public RdbmsStoreManager(Configuration config) { + super(config); + + features = new StandardStoreFeatures.Builder() + .orderedScan(true) + .unorderedScan(true) + .multiQuery(true) + .transactional(true) + .keyConsistent(GraphDatabaseConfiguration.buildGraphConfiguration()) + .keyOrdered(true) + .batchMutation(true) + .build(); + stores = new HashMap<>(); + daoManager = new DaoManager(config.getSubset(JPA_CONFIG_NS)); + + LOG.info("RdbmsStoreManager()"); + } + + public DaoManager getDaoManager() { + return daoManager; + } + + @Override + public KeyColumnValueStore openDatabase(String name, StoreMetaData.Container container) throws BackendException { + LOG.debug("==> RdbmsStoreManager.openDatabase(name={})", name); + + RdbmsStore ret = stores.get(name); + + if (ret == null) { + synchronized (this) { + ret = stores.computeIfAbsent(name, k -> new RdbmsStore(name, this)); + } + } + + LOG.debug("<== RdbmsStoreManager.openDatabase(name={})", name); + + return ret; + } + + @Override + public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> storeMutations, StoreTransaction trx) throws BackendException { + LOG.debug("==> RdbmsStoreManager.mutateMany(numStores={}, trx={})", storeMutations.size(), trx); + + int numMutations = 0; + int numAdditions = 0; + int numDeletions = 0; + + for (Map.Entry<String, Map<StaticBuffer, KCVMutation>> storeEntry : storeMutations.entrySet()) { + String storeName = storeEntry.getKey(); + KeyColumnValueStore store = this.openDatabase(storeName); + + for (Map.Entry<StaticBuffer, KCVMutation> mutationEntry : storeEntry.getValue().entrySet()) { + StaticBuffer key = mutationEntry.getKey(); + KCVMutation mutation = mutationEntry.getValue(); + + numMutations++; + numAdditions += mutation.getAdditions().size(); + numDeletions += mutation.getDeletions().size(); + + store.mutate(key, mutation.getAdditions(), mutation.getDeletions(), trx); + } + } + + LOG.debug("<== RdbmsStoreManager.mutateMany(numStores={}, numMutations={}, numAdditions={}, numDeletions={})", storeMutations.size(), numMutations, numAdditions, numDeletions); + } + + @Override + public StoreTransaction beginTransaction(BaseTransactionConfig baseTransactionConfig) throws BackendException { + LOG.debug("RdbmsStoreManager.beginTransaction()"); + + return new RdbmsTransaction(baseTransactionConfig, this.daoManager); + } + + @Override + public void close() throws BackendException { + LOG.debug("RdbmsStoreManager.close()"); + + synchronized (this) { + for (RdbmsStore store : stores.values()) { + store.close(); + } + + stores.clear(); + daoManager.close(); + } + } + + @Override + public void clearStorage() throws BackendException { + LOG.debug("RdbmsStoreManager.clearStorage(): UnsupportedOperation"); + + throw new UnsupportedOperationException(); + } + + @Override + public boolean exists() throws BackendException { + return true; + } + + @Override + public StoreFeatures getFeatures() { + return features; + } + + @Override + public String getName() { + return NAME; + } + + @Override + public List<KeyRange> getLocalKeyPartition() throws BackendException { + LOG.debug("RdbmsStoreManager.getLocalKeyPartition(): UnsupportedOperation"); + + throw new UnsupportedOperationException(); + } +} diff --git a/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/RdbmsTransaction.java b/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/RdbmsTransaction.java new file mode 100644 index 000000000..0bea68bc3 --- /dev/null +++ b/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/RdbmsTransaction.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.janusgraph.diskstorage.rdbms; + +import org.janusgraph.diskstorage.BaseTransactionConfig; +import org.janusgraph.diskstorage.common.AbstractStoreTransaction; +import org.janusgraph.diskstorage.rdbms.dao.DaoManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.persistence.EntityManager; +import javax.persistence.EntityTransaction; + +import java.io.Closeable; +import java.io.IOException; + +/** + * Store transaction implementation for RDBMS + * + */ +public class RdbmsTransaction extends AbstractStoreTransaction implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(RdbmsTransaction.class); + + private final EntityManager em; + private final EntityTransaction trx; + + public RdbmsTransaction(BaseTransactionConfig trxConfig, DaoManager daoManager) { + super(trxConfig); + + em = daoManager.createEntityManager(); + trx = em.getTransaction(); + + trx.begin(); + } + + public EntityManager getEntityManager() { + return em; + } + + @Override + public void commit() { + LOG.debug("RdbmsTransaction.commit()"); + + try { + if (trx.isActive()) { + trx.commit(); + } + } finally { + em.close(); + } + } + + @Override + public void rollback() { + LOG.debug("RdbmsTransaction.rollback()"); + + try { + if (trx.isActive()) { + trx.rollback(); + } + } finally { + em.close(); + } + } + + @Override + public void close() throws IOException { + LOG.debug("RdbmsTransaction.close()"); + + IOException ret = null; + + if (trx.isActive()) { + try { + trx.rollback(); + } catch (Exception excp) { + ret = new IOException(excp); + } + } + + if (em.isOpen()) { + try { + em.close(); + } catch (Exception excp) { + if (ret != null) { + ret = new IOException(excp); + } + } + } + + if (ret != null) { + throw ret; + } + } +} diff --git a/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/dao/BaseDao.java b/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/dao/BaseDao.java new file mode 100644 index 000000000..ce4530c01 --- /dev/null +++ b/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/dao/BaseDao.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.janusgraph.diskstorage.rdbms.dao; + +import org.janusgraph.diskstorage.rdbms.RdbmsTransaction; + +import javax.persistence.EntityManager; + +/** + * Base DAO to access entities stored in RDBMS + * + */ +public abstract class BaseDao<T> { + protected final EntityManager em; + + protected BaseDao(RdbmsTransaction trx) { + this.em = trx.getEntityManager(); + } + + protected BaseDao(EntityManager em) { + this.em = em; + } + + public T create(T obj) { + em.persist(obj); + + return obj; + } + + protected Long toLong(Object obj) { + return (obj instanceof Long) ? (Long) obj : null; + } + + protected byte[] toByteArray(Object obj) { + return (obj instanceof byte[]) ? (byte[]) obj : null; + } +} diff --git a/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/dao/DaoManager.java b/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/dao/DaoManager.java new file mode 100644 index 000000000..39972e3d9 --- /dev/null +++ b/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/dao/DaoManager.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.janusgraph.diskstorage.rdbms.dao; + +import org.eclipse.persistence.config.PersistenceUnitProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.persistence.EntityManager; +import javax.persistence.EntityManagerFactory; +import javax.persistence.spi.PersistenceProvider; +import javax.persistence.spi.PersistenceProviderResolver; +import javax.persistence.spi.PersistenceProviderResolverHolder; + +import java.util.HashMap; +import java.util.Map; + +/** + * DAO manager that initializes JPA layer + * + * Sample properties to initialize JPA + * storage.backend=rdbms + * storage.rdbms.jpa.javax.persistence.jdbc.dialect=org.eclipse.persistence.platform.database.PostgreSQLPlatform + * storage.rdbms.jpa.javax.persistence.jdbc.driver=org.postgresql.Driver + * storage.rdbms.jpa.javax.persistence.jdbc.url=jdbc:postgresql://dbhost/dbname + * storage.rdbms.jpa.javax.persistence.jdbc.user=janus + * storage.rdbms.jpa.javax.persistence.jdbc.password=janusR0cks! + * storage.rdbms.jpa.javax.persistence.schema-generation.database.action=create + * storage.rdbms.jpa.javax.persistence.schema-generation.create-database-schemas=true + * storage.rdbms.jpa.javax.persistence.schema-generation.create-source=metadata + * + */ +public class DaoManager { + private static final Logger LOG = LoggerFactory.getLogger(DaoManager.class); + + private final EntityManagerFactory emFactory; + + /** + * + * @param jpaConfig + */ + public DaoManager(Map<String, Object> jpaConfig) { + Map<String, String> config = new HashMap<>(); + + if (jpaConfig != null) { + for (Map.Entry<String, Object> entry : jpaConfig.entrySet()) { + String key = entry.getKey(); + Object value = entry.getValue(); + + if (value != null) { + config.put(key, value.toString()); + } + } + } + + config.put(PersistenceUnitProperties.ECLIPSELINK_PERSISTENCE_XML, "META-INF/janus-persistence.xml"); + + LOG.debug("DaoManager: config={}", config); + + PersistenceProviderResolver resolver = PersistenceProviderResolverHolder.getPersistenceProviderResolver(); + EntityManagerFactory emf = null; + + for (PersistenceProvider provider : resolver.getPersistenceProviders()) { + LOG.debug("PersistenceProvider: {}", provider); + + emf = provider.createEntityManagerFactory("janusPU", config); + + if (emf != null) { + break; + } + } + + emFactory = emf; + } + + public EntityManager createEntityManager() { + return emFactory.createEntityManager(); + } + + public void close() { + LOG.info("DaoManager.close()"); + + if (this.emFactory.isOpen()) { + this.emFactory.close(); + } + } +} diff --git a/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/dao/JanusColumnDao.java b/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/dao/JanusColumnDao.java new file mode 100644 index 000000000..d598587ab --- /dev/null +++ b/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/dao/JanusColumnDao.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.janusgraph.diskstorage.rdbms.dao; + +import org.eclipse.persistence.queries.ScrollableCursor; +import org.janusgraph.diskstorage.Entry; +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.keycolumnvalue.KeyIterator; +import org.janusgraph.diskstorage.rdbms.JanusColumnValue; +import org.janusgraph.diskstorage.rdbms.RdbmsStore; +import org.janusgraph.diskstorage.rdbms.RdbmsTransaction; +import org.janusgraph.diskstorage.rdbms.entity.JanusColumn; +import org.janusgraph.diskstorage.util.RecordIterator; +import org.janusgraph.diskstorage.util.StaticArrayBuffer; +import org.janusgraph.diskstorage.util.StaticArrayEntry; + +import javax.persistence.NoResultException; + +import java.util.ArrayList; +import java.util.List; + +/** + * DAO to access Column entities stored in RDBMS + * + */ +public class JanusColumnDao extends BaseDao<JanusColumn> { + private final RdbmsStore store; + + public JanusColumnDao(RdbmsTransaction trx, RdbmsStore store) { + super(trx); + + this.store = store; + } + + public void addOrUpdate(long keyId, byte[] name, byte[] val) { + try { + em.createNativeQuery("INSERT INTO janus_column (id, key_id, name, val) VALUES (NEXTVAL('janus_column_seq'), ?, ?, ?) ON CONFLICT (key_id, name) DO UPDATE SET val = EXCLUDED.val") + .setParameter(1, keyId) + .setParameter(2, name) + .setParameter(3, val) + .executeUpdate(); + } catch (NoResultException excp) { + // ignore + } + } + + public int remove(long keyId, byte[] name) { + return em.createNamedQuery("JanusColumn.deleteByKeyIdAndName") + .setParameter("keyId", keyId) + .setParameter("name", name) + .executeUpdate(); + } + + public List<JanusColumnValue> getColumns(long keyId, byte[] startColumn, byte[] endColumn, int limit) { + List<Object[]> result = em.createNamedQuery("JanusColumn.getColumnsByKeyIdStartNameEndName", Object[].class) + .setParameter("keyId", keyId) + .setParameter("startName", startColumn) + .setParameter("endName", endColumn) + .setMaxResults(limit) + .getResultList(); + + return toColumnList(result); + } + + public KeyIterator getKeysByColumnRange(long storeId, byte[] startColumn, byte[] endColumn, int limit) { + ScrollableCursor result = (ScrollableCursor) em.createNamedQuery("JanusColumn.getKeysByStoreIdColumnRange") + .setParameter("storeId", storeId) + .setParameter("startName", startColumn) + .setParameter("endName", endColumn) + .setHint("eclipselink.cursor.scrollable", true) + .getResultList(); + + return toKeyColumns(result, limit); + } + + public KeyIterator getKeysByKeyAndColumnRange(long storeId, byte[] startKey, byte[] endKey, byte[] startColumn, byte[] endColumn, int limit) { + ScrollableCursor result = (ScrollableCursor) em.createNamedQuery("JanusColumn.getKeysByStoreIdKeyRangeColumnRange") + .setParameter("storeId", storeId) + .setParameter("startKey", startKey) + .setParameter("endKey", endKey) + .setParameter("startName", startColumn) + .setParameter("endName", endColumn) + .setHint("eclipselink.cursor.scrollable", true) + .getSingleResult(); + + return toKeyColumns(result, limit); + } + + private List<JanusColumnValue> toColumnList(List<Object[]> result) { + List<JanusColumnValue> ret = null; + + if (result != null && !result.isEmpty()) { + ret = new ArrayList<>(result.size()); + + for (Object[] row : result) { + byte[] name = toByteArray(row[0]); + byte[] val = toByteArray(row[1]); + + ret.add(new JanusColumnValue(name, val)); + } + } + + return ret; + } + + private KeyIterator toKeyColumns(ScrollableCursor keysResult, int limit) { + final KeyIterator ret; + + if (keysResult != null && keysResult.hasNext()) { + ret = new RdbmsKeyIterator(keysResult, limit); + } else { + ret = EMPTY_KEY_ITERATOR; + } + + return ret; + } + + private class RdbmsKeyIterator implements KeyIterator { + private final ScrollableCursor rows; + private final int limit; + private final Row currKey = new Row(); + private final Row nextKey = new Row(); + private Long prevKeyId; + private boolean isClosed; + + public RdbmsKeyIterator(ScrollableCursor rows, int limit) { + this.rows = rows; + this.limit = limit; + } + + @Override + public boolean hasNext() { + ensureOpen(); + + if (nextKey.keyId == null) { + while (rows.hasNext()) { + Object[] nextRow = (Object[]) rows.next(); + Long keyId = toLong(nextRow[0]); + + if (prevKeyId != null && prevKeyId.equals(keyId)) { // ignore additional columns for this key + continue; + } + + nextKey.set(keyId, StaticArrayBuffer.of(toByteArray(nextRow[1])), new JanusColumnValue(toByteArray(nextRow[2]), toByteArray(nextRow[3]))); + + break; + } + } + + return nextKey.keyId != null; + } + + @Override + public StaticBuffer next() { + ensureOpen(); + + prevKeyId = currKey.keyId; + + if (nextKey.keyId == null) { + hasNext(); + } + + currKey.copyFrom(nextKey); + + nextKey.reset(); + + return currKey.key; + } + + @Override + public RecordIterator<Entry> getEntries() { + ensureOpen(); + + return new RecordIterator<Entry>() { + private boolean isClosed; + private int colCount; + + @Override + public boolean hasNext() { + ensureOpen(); + + if (currKey.column == null) { + while (rows.hasNext()) { + Object[] nextRow = (Object[]) rows.next(); + Long keyId = toLong(nextRow[0]); + + if (!keyId.equals(currKey.keyId)) { + nextKey.set(keyId, StaticArrayBuffer.of(toByteArray(nextRow[1])), new JanusColumnValue(toByteArray(nextRow[2]), toByteArray(nextRow[3]))); + currKey.reset(); + + break; + } else if (colCount < limit) { // ignore additional columns for this key + currKey.column = new JanusColumnValue(toByteArray(nextRow[2]), toByteArray(nextRow[3])); + + break; + } + } + } + + return currKey.column != null; + } + + @Override + public Entry next() { + JanusColumnValue ret = currKey.column; + + currKey.column = null; + colCount++; + + return StaticArrayEntry.ofStaticBuffer(ret, store.toEntry); + } + + @Override + public void close() { + this.isClosed = true; + } + + private void ensureOpen() { + if (isClosed) { + throw new IllegalStateException("Iterator has been closed."); + } + } + }; + } + + @Override + public void close() { + isClosed = true; + + rows.close(); + } + + private void ensureOpen() { + if (isClosed) { + throw new IllegalStateException("Iterator has been closed."); + } + } + + private class Row { + private Long keyId; + private StaticBuffer key; + private JanusColumnValue column; + + Row() { } + + Row(Long keyId, StaticBuffer key, JanusColumnValue column) { + this.keyId = keyId; + this.key = key; + this.column = column; + } + + void copyFrom(Row other) { + this.keyId = other.keyId; + this.key = other.key; + this.column = other.column; + } + + void set(Long keyId, StaticBuffer key, JanusColumnValue column) { + this.keyId = keyId; + this.key = key; + this.column = column; + } + + void reset() { + this.keyId = null; + this.key = null; + this.column = null; + } + } + } + + public static final KeyIterator EMPTY_KEY_ITERATOR = new KeyIterator() { + @Override + public RecordIterator<Entry> getEntries() { + return null; + } + + @Override + public void close() { + } + + @Override + public boolean hasNext() { + return false; + } + + @Override + public StaticBuffer next() { + return null; + } + }; +} diff --git a/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/dao/JanusKeyDao.java b/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/dao/JanusKeyDao.java new file mode 100644 index 000000000..1d8b73d28 --- /dev/null +++ b/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/dao/JanusKeyDao.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.janusgraph.diskstorage.rdbms.dao; + +import org.janusgraph.diskstorage.rdbms.RdbmsTransaction; +import org.janusgraph.diskstorage.rdbms.entity.JanusKey; + +import javax.persistence.NoResultException; + +/** + * DAO to access Key entities stored in RDBMS + * + */ +public class JanusKeyDao extends BaseDao<JanusKey> { + public JanusKeyDao(RdbmsTransaction trx) { + super(trx); + } + + public Long getIdByStoreIdAndName(long storeId, byte[] name) { + try { + Object result = em.createNamedQuery("JanusKey.getIdByStoreIdAndName") + .setParameter("storeId", storeId) + .setParameter("name", name) + .getSingleResult(); + + return toLong(result); + } catch (NoResultException excp) { + // ignore + } + + return null; + } +} diff --git a/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/dao/JanusStoreDao.java b/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/dao/JanusStoreDao.java new file mode 100644 index 000000000..dc4c8958d --- /dev/null +++ b/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/dao/JanusStoreDao.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.janusgraph.diskstorage.rdbms.dao; + +import org.janusgraph.diskstorage.rdbms.RdbmsTransaction; +import org.janusgraph.diskstorage.rdbms.entity.JanusStore; + +import javax.persistence.NoResultException; + +/** + * DAO to access Store entities stored in RDBMS + * + */ +public class JanusStoreDao extends BaseDao<JanusStore> { + public JanusStoreDao(RdbmsTransaction trx) { + super(trx); + } + + public Long getIdByName(String name) { + try { + Object result = em.createNamedQuery("JanusStore.getIdByName") + .setParameter("name", name) + .getSingleResult(); + + return toLong(result); + } catch (NoResultException excp) { + // ignore + } + + return null; + } +} diff --git a/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/entity/JanusColumn.java b/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/entity/JanusColumn.java new file mode 100644 index 000000000..3e9239c8b --- /dev/null +++ b/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/entity/JanusColumn.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.janusgraph.diskstorage.rdbms.entity; + +import javax.persistence.Cacheable; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.Index; +import javax.persistence.Lob; +import javax.persistence.SequenceGenerator; +import javax.persistence.Table; +import javax.persistence.UniqueConstraint; + +import java.util.Arrays; +import java.util.Objects; + +/** + * RDBMS representation of a JanusGraph Column - name/value pair in a JanusGraph key + * + * @author Madhan Neethiraj <mad...@apache.org> + */ +@Entity +@Cacheable(false) +@Table(name = "janus_column", + indexes = {@Index(name = "janus_column_idx_key_id", columnList = "key_id")}, + uniqueConstraints = {@UniqueConstraint(name = "janus_column_uk_key_name", columnNames = {"key_id", "name"})}) +public class JanusColumn implements java.io.Serializable { + private static final long serialVersionUID = 1L; + + @Id + @SequenceGenerator(name = "janus_column_seq", sequenceName = "janus_column_seq", allocationSize = 1000) + @GeneratedValue(strategy = GenerationType.AUTO, generator = "janus_column_seq") + @Column(name = "id") + protected Long id; + + @Column(name = "key_id", nullable = false) + protected Long keyId; + + @Lob + @Column(name = "name", nullable = false) + protected byte[] name; + + @Lob + @Column(name = "val") + protected byte[] val; + + public JanusColumn() { } + + public JanusColumn(Long keyId, byte[] name, byte[] val) { + this.keyId = keyId; + this.name = name; + this.val = val; + } + + public void setId(Long id) { + this.id = id; + } + + public Long getId() { + return id; + } + + public void setKeyId(Long keyId) { + this.keyId = keyId; + } + + public Long getKeyId() { + return keyId; + } + + public void setName(byte[] name) { + this.name = name; + } + + public byte[] getName() { + return name; + } + + public void setVal(byte[] val) { + this.val = val; + } + + public byte[] getVal() { + return val; + } + + @Override + public int hashCode() { + return Objects.hash(id, keyId, name, val); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } else if (obj instanceof JanusColumn && getClass() == obj.getClass()) { + JanusColumn other = (JanusColumn) obj; + + return Objects.equals(id, other.id) && + Objects.equals(keyId, other.keyId) && + Arrays.equals(name, other.name) && + Arrays.equals(val, other.val); + } + + return false; + } + + @Override + public String toString() { + return "JanusColumn(id=" + id + ", keyId=" + keyId + ", name=" + name + ", val=" + val + ")"; + } +} diff --git a/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/entity/JanusKey.java b/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/entity/JanusKey.java new file mode 100644 index 000000000..3f5418af1 --- /dev/null +++ b/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/entity/JanusKey.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.janusgraph.diskstorage.rdbms.entity; + +import javax.persistence.Cacheable; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.Index; +import javax.persistence.Lob; +import javax.persistence.SequenceGenerator; +import javax.persistence.Table; +import javax.persistence.UniqueConstraint; + +import java.util.Arrays; +import java.util.Objects; + +/** + * RDBMS representation of a JanusGraph key, that can hold columns which are name/value pairs + * + * @author Madhan Neethiraj <mad...@apache.org> + */ +@Entity +@Cacheable(false) +@Table(name = "janus_key", + indexes = {@Index(name = "janus_key_idx_store_id", columnList = "store_id")}, + uniqueConstraints = {@UniqueConstraint(name = "janus_key_uk_store_name", columnNames = {"store_id", "name"})}) +public class JanusKey implements java.io.Serializable { + private static final long serialVersionUID = 1L; + + @Id + @SequenceGenerator(name = "janus_key_seq", sequenceName = "janus_key_seq", allocationSize = 1000) + @GeneratedValue(strategy = GenerationType.AUTO, generator = "janus_key_seq") + @Column(name = "id") + protected Long id; + + @Column(name = "store_id", nullable = false) + protected Long storeId; + + @Lob + @Column(name = "name", nullable = false) + protected byte[] name; + + public JanusKey() { } + + public JanusKey(Long storeId, byte[] name) { + this.storeId = storeId; + this.name = name; + } + + public void setId(Long id) { + this.id = id; + } + + public Long getId() { + return id; + } + + public void setStoreId(Long storeId) { + this.storeId = storeId; + } + + public Long getStoreId() { + return storeId; + } + + public void setName(byte[] name) { + this.name = name; + } + + public byte[] getName() { + return name; + } + + @Override + public int hashCode() { + return Objects.hash(id, storeId, name); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } else if (obj instanceof JanusKey && getClass() == obj.getClass()) { + JanusKey other = (JanusKey) obj; + + return Objects.equals(id, other.id) && + Objects.equals(storeId, other.storeId) && + Arrays.equals(name, other.name); + } + + return false; + } + + @Override + public String toString() { + return "JanusKey(id=" + id + ", storeId=" + storeId + ", name=" + name + ")"; + } +} diff --git a/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/entity/JanusStore.java b/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/entity/JanusStore.java new file mode 100644 index 000000000..bbfe914c7 --- /dev/null +++ b/graphdb/janusgraph-rdbms/src/main/java/org/janusgraph/diskstorage/rdbms/entity/JanusStore.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.janusgraph.diskstorage.rdbms.entity; + +import javax.persistence.Cacheable; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.SequenceGenerator; +import javax.persistence.Table; +import javax.persistence.UniqueConstraint; + +import java.util.Objects; + +/** + * RDBMS representation of a store that can hold keys + * + * @author Madhan Neethiraj <mad...@apache.org> + */ +@Entity +@Cacheable(false) +@Table(name = "janus_store", + uniqueConstraints = @UniqueConstraint(name = "janus_store_uk_name", columnNames = "name")) +public class JanusStore implements java.io.Serializable { + private static final long serialVersionUID = 1L; + + @Id + @SequenceGenerator(name = "janus_store_seq", sequenceName = "janus_store_seq", allocationSize = 1) + @GeneratedValue(strategy = GenerationType.AUTO, generator = "janus_store_seq") + @Column(name = "id") + protected Long id; + + @Column(name = "name", nullable = false) + protected String name; + + public JanusStore() { } + + public JanusStore(String name) { + this.name = name; + } + + public void setId(Long id) { + this.id = id; + } + + public Long getId() { + return id; + } + + public void setName(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + @Override + public int hashCode() { + return Objects.hash(id, name); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } else if (obj instanceof JanusStore && getClass() == obj.getClass()) { + JanusStore other = (JanusStore) obj; + + return Objects.equals(id, other.id) && + Objects.equals(name, other.name); + } + + return false; + } + + @Override + public String toString() { + return "JanusStore(id=" + id + ", name=" + name + ")"; + } +} diff --git a/graphdb/janusgraph-rdbms/src/main/resources/META-INF/janus-jpa_named_queries.xml b/graphdb/janusgraph-rdbms/src/main/resources/META-INF/janus-jpa_named_queries.xml new file mode 100644 index 000000000..2dcd519df --- /dev/null +++ b/graphdb/janusgraph-rdbms/src/main/resources/META-INF/janus-jpa_named_queries.xml @@ -0,0 +1,71 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<entity-mappings version="1.0" + xmlns="http://java.sun.com/xml/ns/persistence/orm" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://java.sun.com/xml/ns/persistence/orm http://java.sun.com/xml/ns/persistence/orm_1_0.xsd "> + + <named-query name="JanusStore.getIdByName"> + <query>SELECT s.id FROM JanusStore s WHERE s.name = :name</query> + </named-query> + + <named-query name="JanusKey.getIdByStoreIdAndName"> + <query>SELECT k.id FROM JanusKey k WHERE k.storeId = :storeId AND k.name = :name</query> + </named-query> + + <named-query name="JanusKey.getKeysByStoreId"> + <query>SELECT k.name FROM JanusKey k WHERE k.storeId = :storeId</query> + </named-query> + + <named-query name="JanusColumn.getColumnIdByKeyIdAndName"> + <query>SELECT c.id + FROM JanusColumn c + WHERE c.keyId = :keyId AND c.name = :name</query> + </named-query> + + <named-query name="JanusColumn.updateById"> + <query>UPDATE JanusColumn c SET c.val = :val WHERE c.id = :id</query> + </named-query> + + <named-query name="JanusColumn.updateByKeyIdAndName"> + <query>UPDATE JanusColumn c SET c.val = :val WHERE c.keyId = :keyId AND c.name = :name</query> + </named-query> + + <named-query name="JanusColumn.deleteByKeyIdAndName"> + <query>DELETE FROM JanusColumn c WHERE c.keyId = :keyId AND c.name = :name</query> + </named-query> + + <named-query name="JanusColumn.getValueByStoreIdKeyIdName"> + <query>SELECT c.val FROM JanusColumn c WHERE c.keyId = :keyId AND c.name = :name</query> + </named-query> + + <named-query name="JanusColumn.getColumnsByKeyIdStartNameEndName"> + <query>SELECT c.name, c.val + FROM JanusColumn c + WHERE c.keyId = :keyId AND c.name >= :startName AND c.name < :endName</query> + </named-query> + + <named-query name="JanusColumn.getKeysByStoreIdColumnRange"> + <query>SELECT k.id, k.name, c.name, c.val + FROM JanusColumn c, JanusKey k + WHERE k.storeId = :storeId + AND c.keyId = k.id AND c.name >= :startName AND c.name < :endName + ORDER BY k.name, c.name</query> + </named-query> + + <named-query name="JanusColumn.getKeysByStoreIdKeyRangeColumnRange"> + <query>SELECT k.id, k.name, c.name, c.val + FROM JanusColumn c, JanusKey k + WHERE k.storeId = :storeId AND k.name >= :startKey AND k.name < :endKey + AND c.keyId = k.id AND c.name >= :startName AND c.name < :endName + ORDER BY k.name, c.name</query> + </named-query> +</entity-mappings> diff --git a/graphdb/janusgraph-rdbms/src/main/resources/META-INF/janus-persistence.xml b/graphdb/janusgraph-rdbms/src/main/resources/META-INF/janus-persistence.xml new file mode 100644 index 000000000..6a99e4d22 --- /dev/null +++ b/graphdb/janusgraph-rdbms/src/main/resources/META-INF/janus-persistence.xml @@ -0,0 +1,32 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<persistence version="2.0" xmlns="http://java.sun.com/xml/ns/persistence" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd"> + <persistence-unit name="janusPU"> + <mapping-file>META-INF/janus-jpa_named_queries.xml</mapping-file> + + <class>org.janusgraph.diskstorage.rdbms.entity.JanusStore</class> + <class>org.janusgraph.diskstorage.rdbms.entity.JanusKey</class> + <class>org.janusgraph.diskstorage.rdbms.entity.JanusColumn</class> + <shared-cache-mode>NONE</shared-cache-mode> + + <properties> + <property name="eclipselink.logging.level" value="WARNING"/> + <property name="eclipselink.jdbc.batch-writing" value="jdbc"/> + </properties> + </persistence-unit> +</persistence> diff --git a/graphdb/janusgraph-rdbms/src/main/resources/META-INF/postgres/create_schema.sql b/graphdb/janusgraph-rdbms/src/main/resources/META-INF/postgres/create_schema.sql new file mode 100644 index 000000000..ca68b92ff --- /dev/null +++ b/graphdb/janusgraph-rdbms/src/main/resources/META-INF/postgres/create_schema.sql @@ -0,0 +1,44 @@ +-- Licensed to the Apache Software Foundation(ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +--(the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, softwaren +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +CREATE SEQUENCE IF NOT EXISTS janus_store_seq CACHE 1; +CREATE SEQUENCE IF NOT EXISTS janus_key_seq CACHE 1000; +CREATE SEQUENCE IF NOT EXISTS janus_column_seq CACHE 1000; + +CREATE TABLE IF NOT EXISTS janus_store( + id BIGINT DEFAULT nextval('janus_store_seq'::regclass), + name VARCHAR(512) NOT NULL, + PRIMARY KEY(id), + CONSTRAINT janus_store_uk_name UNIQUE(name)); + +CREATE TABLE IF NOT EXISTS janus_key( + id BIGINT DEFAULT nextval('janus_key_seq'::regclass), + store_id BIGINT NOT NULL, + name BYTEA NOT NULL, + PRIMARY KEY(id), + CONSTRAINT janus_key_uk_store_name UNIQUE(store_id, name), + CONSTRAINT janus_key_fk_store FOREIGN KEY(store_id) REFERENCES janus_store(id)); + +CREATE TABLE IF NOT EXISTS janus_column( + id BIGINT DEFAULT nextval('janus_column_seq'::regclass), + key_id BIGINT NOT NULL, + name BYTEA NOT NULL, + val BYTEA NOT NULL, + PRIMARY KEY(id), + CONSTRAINT janus_column_uk_key_name UNIQUE(key_id, name), + CONSTRAINT janus_column_fk_key FOREIGN KEY(key_id) REFERENCES janus_key(id)); + +CREATE INDEX IF NOT EXISTS janus_key_idx_store_id ON janus_key (store_id); +CREATE INDEX IF NOT EXISTS janus_column_idx_key_id ON janus_column (key_id); diff --git a/graphdb/janusgraph-rdbms/src/main/resources/META-INF/postgres/create_sequences.sql b/graphdb/janusgraph-rdbms/src/main/resources/META-INF/postgres/create_sequences.sql new file mode 100644 index 000000000..4d8313dfa --- /dev/null +++ b/graphdb/janusgraph-rdbms/src/main/resources/META-INF/postgres/create_sequences.sql @@ -0,0 +1,18 @@ +-- Licensed to the Apache Software Foundation(ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +--(the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, softwaren +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +CREATE SEQUENCE IF NOT EXISTS janus_store_seq CACHE 1; +CREATE SEQUENCE IF NOT EXISTS janus_key_seq CACHE 1000; +CREATE SEQUENCE IF NOT EXISTS janus_column_seq CACHE 1000; diff --git a/graphdb/pom.xml b/graphdb/pom.xml index d58286dc1..f18ba8f48 100644 --- a/graphdb/pom.xml +++ b/graphdb/pom.xml @@ -37,11 +37,11 @@ <module>common</module> <module>graphdb-impls</module> <module>janus</module> + <module>janusgraph-rdbms</module> </modules> <properties> <checkstyle.failOnViolation>true</checkstyle.failOnViolation> <checkstyle.skip>false</checkstyle.skip> </properties> - </project> diff --git a/pom.xml b/pom.xml index 728264f5e..8299c21c1 100644 --- a/pom.xml +++ b/pom.xml @@ -108,6 +108,7 @@ <curator.version>4.3.0</curator.version> <doxia.version>1.8</doxia.version> <dropwizard-metrics>3.2.2</dropwizard-metrics> + <eclipse.jpa.version>2.7.15</eclipse.jpa.version> <elasticsearch.version>7.17.8</elasticsearch.version> <entity.repository.impl>org.apache.atlas.repository.audit.InMemoryEntityAuditRepository</entity.repository.impl> <enunciate-maven-plugin.version>2.13.2</enunciate-maven-plugin.version> @@ -136,6 +137,7 @@ <javac.source.version>1.8</javac.source.version> <javac.target.version>1.8</javac.target.version> <javax-inject.version>1</javax-inject.version> + <javax.persistence.version>2.2.1</javax.persistence.version> <javax.servlet.version>3.1.0</javax.servlet.version> <jaxb.api.version>2.3.1</jaxb.api.version> <jersey-spring.version>1.19.4</jersey-spring.version> @@ -165,6 +167,7 @@ <paranamer.version>2.7</paranamer.version> <poi-ooxml.version>5.2.2</poi-ooxml.version> <poi.version>5.2.2</poi.version> + <postgresql.version>42.7.7</postgresql.version> <project.build.dashboard.gruntBuild>run build</project.build.dashboard.gruntBuild> <project.build.dashboardv2.gruntBuild>build-minify</project.build.dashboardv2.gruntBuild> <project.build.dashboardv3.gruntBuild>build-minify</project.build.dashboardv3.gruntBuild> diff --git a/repository/pom.xml b/repository/pom.xml index fcadcd197..6001270f1 100644 --- a/repository/pom.xml +++ b/repository/pom.xml @@ -210,11 +210,29 @@ </exclusions> </dependency> + <dependency> + <groupId>org.eclipse.persistence</groupId> + <artifactId>eclipselink</artifactId> + <version>${eclipse.jpa.version}</version> + </dependency> + + <dependency> + <groupId>org.eclipse.persistence</groupId> + <artifactId>javax.persistence</artifactId> + <version>${javax.persistence.version}</version> + </dependency> + <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> </dependency> + <dependency> + <groupId>org.postgresql</groupId> + <artifactId>postgresql</artifactId> + <version>${postgresql.version}</version> + </dependency> + <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java index cc5312d0c..a12e56d44 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/AbstractStorageBasedAuditRepository.java @@ -159,10 +159,42 @@ public abstract class AbstractStorageBasedAuditRepository implements Service, En applicationProperties = config; } + protected String getKeyStr(String id, Long ts, int index) { + return id + FIELD_SEPARATOR + ts + FIELD_SEPARATOR + index + FIELD_SEPARATOR + System.currentTimeMillis(); + } + protected byte[] getKey(String id, Long ts, int index) { - String keyStr = id + FIELD_SEPARATOR + ts + FIELD_SEPARATOR + index + FIELD_SEPARATOR + System.currentTimeMillis(); + return Bytes.toBytes(getKeyStr(id, ts, index)); + } + + protected long getTimestampFromKey(String key) { + String[] parts = key.split(FIELD_SEPARATOR); - return Bytes.toBytes(keyStr); + if (parts.length < 3) { + return 0L; + } + + try { + return Long.parseLong(parts[1]); + } catch (NumberFormatException e) { + LOG.error("Error parsing timestamp from key: {}", key, e); + return 0L; + } + } + + protected int getIndexFromKey(String key) { + String[] parts = key.split(FIELD_SEPARATOR); + + if (parts.length < 3) { + return 0; + } + + try { + return Integer.parseInt(parts[2]); + } catch (NumberFormatException e) { + LOG.error("Error parsing index from key: {}", key, e); + return 0; + } } static { diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/RdbmsBasedAuditRepository.java b/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/RdbmsBasedAuditRepository.java new file mode 100644 index 000000000..18b2155b0 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/RdbmsBasedAuditRepository.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.audit.rdbms; + +import org.apache.atlas.ApplicationProperties; +import org.apache.atlas.AtlasException; +import org.apache.atlas.EntityAuditEvent; +import org.apache.atlas.annotation.ConditionalOnAtlasProperty; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.audit.EntityAuditEventV2; +import org.apache.atlas.repository.audit.AbstractStorageBasedAuditRepository; +import org.apache.atlas.repository.audit.rdbms.dao.DaoManager; +import org.apache.atlas.repository.audit.rdbms.dao.DbEntityAuditDao; +import org.apache.atlas.repository.audit.rdbms.entity.DbEntityAudit; +import org.apache.commons.configuration.Configuration; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.inject.Singleton; + +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +@Singleton +@Component +@ConditionalOnAtlasProperty(property = "atlas.EntityAuditRepository.impl", isDefault = true) +@Order(0) +public class RdbmsBasedAuditRepository extends AbstractStorageBasedAuditRepository { + private DaoManager daoManager; + + @PostConstruct + public void init() { + daoManager = new DaoManager(getConfiguration().subset("atlas.graph.storage.rdbms.jpa")); + } + + @Override + public void putEventsV1(List<EntityAuditEvent> events) throws AtlasException { + // TODO: is V1 supported needed anymore? + } + + @Override + public List<EntityAuditEvent> listEventsV1(String entityId, String startKey, short n) throws AtlasException { + // TODO: is V1 supported needed anymore? + return Collections.emptyList(); + } + + @Override + public void putEventsV2(List<EntityAuditEventV2> events) throws AtlasBaseException { + try (RdbmsTransaction trx = new RdbmsTransaction(daoManager.getEntityManagerFactory())) { + DbEntityAuditDao dao = daoManager.getEntityAuditDao(trx.getEntityManager()); + + for (int i = 0; i < events.size(); i++) { + EntityAuditEventV2 event = events.get(i); + DbEntityAudit dbEvent = toDbEntityAudit(event); + + dbEvent.setEventIndex(i); + + dao.create(dbEvent); + } + + trx.commit(); + } catch (Exception excp) { + throw new AtlasBaseException("Error while persisting audit events", excp); + } + } + + @Override + public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String startKey, short maxResultCount) throws AtlasBaseException { + try (RdbmsTransaction trx = new RdbmsTransaction(daoManager.getEntityManagerFactory())) { + DbEntityAuditDao dao = daoManager.getEntityAuditDao(trx.getEntityManager()); + + List<DbEntityAudit> dbEvents = dao.getByEntityIdActionStartTimeStartIdx(entityId, auditAction, getTimestampFromKey(startKey), getIndexFromKey(startKey), maxResultCount); + + return dbEvents.stream().map(RdbmsBasedAuditRepository::fromDbEntityAudit).collect(Collectors.toList()); + } catch (Exception excp) { + throw new AtlasBaseException("Error while retrieving audit events", excp); + } + } + + @Override + public List<EntityAuditEventV2> listEventsV2(String entityId, EntityAuditEventV2.EntityAuditActionV2 auditAction, String sortByColumn, boolean sortOrderDesc, int offset, short limit) throws AtlasBaseException { + try (RdbmsTransaction trx = new RdbmsTransaction(daoManager.getEntityManagerFactory())) { + DbEntityAuditDao dao = daoManager.getEntityAuditDao(trx.getEntityManager()); + + List<DbEntityAudit> dbEvents = dao.getByEntityIdAction(entityId, auditAction, offset, limit); + + return dbEvents.stream().map(RdbmsBasedAuditRepository::fromDbEntityAudit).collect(Collectors.toList()); + } catch (Exception excp) { + throw new AtlasBaseException("Error while retrieving audit events", excp); + } + } + + @Override + public Set<String> getEntitiesWithTagChanges(long fromTimestamp, long toTimestamp) throws AtlasBaseException { + // TODO: + return Collections.emptySet(); + } + + @Override + public void start() throws AtlasException { + } + + @Override + public void stop() throws AtlasException { + } + + public static DbEntityAudit toDbEntityAudit(EntityAuditEventV2 event) { + DbEntityAudit ret = new DbEntityAudit(); + + ret.setEntityId(event.getEntityId()); + ret.setEventTime(event.getTimestamp()); + ret.setUser(event.getUser()); + ret.setAction(event.getAction().ordinal()); + ret.setDetails(event.getDetails()); + + if (event.getType() == null) { + ret.setAuditType(EntityAuditEventV2.EntityAuditType.ENTITY_AUDIT_V2.ordinal()); + } else { + ret.setAuditType(event.getType().ordinal()); + } + + if (PERSIST_ENTITY_DEFINITION) { + ret.setEntity(event.getEntityDefinitionString()); + } + + return ret; + } + + public static EntityAuditEventV2 fromDbEntityAudit(DbEntityAudit dbEntityAudit) { + EntityAuditEventV2 ret = new EntityAuditEventV2(); + + ret.setEntityId(dbEntityAudit.getEntityId()); + ret.setTimestamp(dbEntityAudit.getEventTime()); + ret.setUser(dbEntityAudit.getUser()); + ret.setAction(EntityAuditEventV2.EntityAuditActionV2.values()[dbEntityAudit.getAction()]); + ret.setDetails(dbEntityAudit.getDetails()); + ret.setType(EntityAuditEventV2.EntityAuditType.values()[dbEntityAudit.getAuditType()]); + + if (PERSIST_ENTITY_DEFINITION) { + ret.setEntityDefinition(dbEntityAudit.getEntity()); + } + + return ret; + } + + private Configuration getConfiguration() { + try { + return ApplicationProperties.get(); + } catch (AtlasException e) { + throw new RuntimeException("Failed to get application properties", e); + } + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/RdbmsTransaction.java b/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/RdbmsTransaction.java new file mode 100644 index 000000000..33416f2c8 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/RdbmsTransaction.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.audit.rdbms; + +import javax.persistence.EntityManager; +import javax.persistence.EntityManagerFactory; +import javax.persistence.EntityTransaction; + +public class RdbmsTransaction implements AutoCloseable { + private final EntityManager em; + private final EntityTransaction trx; + + public RdbmsTransaction(EntityManagerFactory emf) { + this.em = emf.createEntityManager(); + this.trx = em.getTransaction(); + + this.trx.begin(); + } + + public EntityManager getEntityManager() { + return em; + } + + public void commit() { + if (trx.isActive()) { + trx.commit(); + } + } + + @Override + public void close() { + if (trx.isActive()) { + trx.rollback(); + } + + em.close(); + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/dao/BaseDao.java b/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/dao/BaseDao.java new file mode 100644 index 000000000..d3a28869f --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/dao/BaseDao.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.atlas.repository.audit.rdbms.dao; + +import javax.persistence.EntityManager; + +public abstract class BaseDao<T> { + protected final EntityManager em; + + protected BaseDao(EntityManager em) { + this.em = em; + } + + public T create(T obj) { + em.persist(obj); + + return obj; + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/dao/DaoManager.java b/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/dao/DaoManager.java new file mode 100644 index 000000000..e7e3b35de --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/dao/DaoManager.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.audit.rdbms.dao; + +import org.apache.commons.configuration.Configuration; +import org.eclipse.persistence.config.PersistenceUnitProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.persistence.EntityManager; +import javax.persistence.EntityManagerFactory; +import javax.persistence.Persistence; +import javax.persistence.spi.PersistenceProvider; +import javax.persistence.spi.PersistenceProviderResolver; +import javax.persistence.spi.PersistenceProviderResolverHolder; + +import java.util.HashMap; +import java.util.Map; + +public class DaoManager { + private static final Logger LOG = LoggerFactory.getLogger(DaoManager.class); + + private final EntityManagerFactory emFactory; + + public DaoManager(Configuration jpaConfig) { + Map<String, String> config = new HashMap<>(); + + if (jpaConfig != null) { + jpaConfig.getKeys().forEachRemaining(key -> { + Object value = jpaConfig.getProperty(key); + + if (value != null) { + config.put(key, value.toString()); + } + }); + } + + config.put(PersistenceUnitProperties.ECLIPSELINK_PERSISTENCE_XML, "META-INF/atlas-persistence.xml"); + + LOG.debug("DaoManager: config={}", config); + + PersistenceProviderResolver resolver = PersistenceProviderResolverHolder.getPersistenceProviderResolver(); + EntityManagerFactory emf = null; + + for (PersistenceProvider provider : resolver.getPersistenceProviders()) { + LOG.debug("PersistenceProvider: {}", provider); + + emf = provider.createEntityManagerFactory("atlasPU", config); + + if (emf != null) { + break; + } + } + + emFactory = emf; + } + + public EntityManagerFactory getEntityManagerFactory() { + return emFactory; + } + + public DbEntityAuditDao getEntityAuditDao(EntityManager em) { + return new DbEntityAuditDao(em); + } + + public void close() { + LOG.info("DaoManager.close()"); + + if (this.emFactory.isOpen()) { + this.emFactory.close(); + } + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/dao/DbEntityAuditDao.java b/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/dao/DbEntityAuditDao.java new file mode 100644 index 000000000..d3c792e9c --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/dao/DbEntityAuditDao.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.audit.rdbms.dao; + +import org.apache.atlas.model.audit.EntityAuditEventV2; +import org.apache.atlas.repository.audit.rdbms.entity.DbEntityAudit; + +import javax.persistence.EntityManager; +import javax.persistence.NoResultException; + +import java.util.Collections; +import java.util.List; + +public class DbEntityAuditDao extends BaseDao<DbEntityAudit> { + protected DbEntityAuditDao(EntityManager em) { + super(em); + } + + public List<DbEntityAudit> getByEntityIdActionStartTimeStartIdx(String entityId, EntityAuditEventV2.EntityAuditActionV2 action, long eventTimeStart, int eventIdxStart, int maxResults) { + try { + return em.createNamedQuery("DbEntityAudit.getByEntityIdActionStartTimeStartIdx", DbEntityAudit.class) + .setParameter("entityId", entityId) + .setParameter("action", action.ordinal()) + .setParameter("eventTimeStart", eventTimeStart) + .setParameter("eventIdxStart", eventIdxStart) + .setMaxResults(maxResults) + .getResultList(); + } catch (NoResultException excp) { + // ignore + } + + return Collections.emptyList(); + } + + public List<DbEntityAudit> getByEntityIdAction(String entityId, EntityAuditEventV2.EntityAuditActionV2 action, int startIdx, int maxResults) { + try { + if (action == null) { + return em.createNamedQuery("DbEntityAudit.getByEntityId", DbEntityAudit.class) + .setParameter("entityId", entityId) + .setFirstResult(startIdx) + .setMaxResults(maxResults) + .getResultList(); + } else { + return em.createNamedQuery("DbEntityAudit.getByEntityIdAction", DbEntityAudit.class) + .setParameter("entityId", entityId) + .setParameter("action", action.ordinal()) + .setFirstResult(startIdx) + .setMaxResults(maxResults) + .getResultList(); + } + } catch (NoResultException excp) { + // ignore + } + + return Collections.emptyList(); + } +} diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/entity/DbEntityAudit.java b/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/entity/DbEntityAudit.java new file mode 100644 index 000000000..e02bc348a --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/audit/rdbms/entity/DbEntityAudit.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.audit.rdbms.entity; + +import javax.persistence.Cacheable; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.Index; +import javax.persistence.Lob; +import javax.persistence.SequenceGenerator; +import javax.persistence.Table; + +import java.util.Objects; + +/** + * RDBMS representation of a JanusGraph Column - name/value pair in a JanusGraph key + * + * @author Madhan Neethiraj <mad...@apache.org> + */ +@Entity +@Cacheable(false) +@Table(name = "atlas_entity_audit", + indexes = {@Index(name = "atlas_entity_audit_idx_entity_id", columnList = "entity_id"), + @Index(name = "atlas_entity_audit_idx_event_time", columnList = "event_time"), + @Index(name = "atlas_entity_audit_idx_entity_id_event_time", columnList = "entity_id,event_time"), + @Index(name = "atlas_entity_audit_idx_user_name", columnList = "user_name")}) +public class DbEntityAudit implements java.io.Serializable { + @Id + @SequenceGenerator(name = "atlas_entity_audit_seq", sequenceName = "atlas_entity_audit_seq", allocationSize = 1000) + @GeneratedValue(strategy = GenerationType.AUTO, generator = "atlas_entity_audit_seq") + @Column(name = "id") + protected Long id; + + @Column(name = "entity_id", nullable = false, length = 64) + protected String entityId; + + @Column(name = "event_time", nullable = false) + protected long eventTime; + + @Column(name = "event_idx", nullable = false) + protected int eventIndex; + + @Column(name = "user_name", nullable = false, length = 64) + protected String user; + + @Column(name = "operation", nullable = false) + protected int action; + + @Column(name = "details") + @Lob + protected String details; + + @Column(name = "entity") + @Lob + protected String entity; + + @Column(name = "audit_type") + protected int auditType; + + public Long getId() { + return id; + } + + public void setId(Long id) { + this.id = id; + } + + public String getEntityId() { + return entityId; + } + + public void setEntityId(String entityId) { + this.entityId = entityId; + } + + public long getEventTime() { + return eventTime; + } + + public void setEventTime(long eventTime) { + this.eventTime = eventTime; + } + + public int getEventIndex() { + return eventIndex; + } + + public void setEventIndex(int eventIndex) { + this.eventIndex = eventIndex; + } + + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } + + public int getAction() { + return action; + } + + public void setAction(int action) { + this.action = action; + } + + public String getDetails() { + return details; + } + + public void setDetails(String details) { + this.details = details; + } + + public String getEntity() { + return entity; + } + + public void setEntity(String entity) { + this.entity = entity; + } + + public int getAuditType() { + return auditType; + } + + public void setAuditType(int auditType) { + this.auditType = auditType; + } + + @Override + public int hashCode() { + return Objects.hash(id, entityId, eventTime, eventIndex, user, action, details, entity, auditType); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } else if (obj instanceof DbEntityAudit && getClass() == obj.getClass()) { + DbEntityAudit other = (DbEntityAudit) obj; + + return Objects.equals(id, other.id) && + Objects.equals(entityId, other.entityId) && + eventTime == other.eventTime && + eventIndex == other.eventIndex && + Objects.equals(user, other.user) && + action == other.action && + Objects.equals(details, other.details) && + Objects.equals(entity, other.entity) && + auditType == other.auditType; + } + + return false; + } + + @Override + public String toString() { + return "DbEntityAudit(id=" + id + ", entityId=" + entityId + ", eventTime=" + eventTime + ", eventIndex=" + eventIndex + ", user=" + user + ", action=" + action + ", details=" + details + ", entity=" + entity + ", auditType=" + auditType + ")"; + } +} diff --git a/repository/src/main/resources/META-INF/atlas-jpa_named_queries.xml b/repository/src/main/resources/META-INF/atlas-jpa_named_queries.xml new file mode 100644 index 000000000..6486b5b89 --- /dev/null +++ b/repository/src/main/resources/META-INF/atlas-jpa_named_queries.xml @@ -0,0 +1,37 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<entity-mappings version="1.0" + xmlns="http://java.sun.com/xml/ns/persistence/orm" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://java.sun.com/xml/ns/persistence/orm http://java.sun.com/xml/ns/persistence/orm_1_0.xsd "> + + <named-query name="DbEntityAudit.getByEntityId"> + <query>SELECT obj FROM DbEntityAudit obj WHERE obj.entityId = :entityId</query> + </named-query> + + <named-query name="DbEntityAudit.getByEntityIdAction"> + <query>SELECT obj FROM DbEntityAudit obj + WHERE obj.entityId = :entityId + AND obj.action = :action + ORDER BY obj.eventTime DESC, obj.eventIndex DESC + </query> + </named-query> + + <named-query name="DbEntityAudit.getByEntityIdActionStartTimeStartIdx"> + <query>SELECT obj FROM DbEntityAudit obj + WHERE obj.entityId = :entityId + AND obj.action = :action + AND obj.eventTime >= :eventTimeStart + AND obj.eventIndex >= :eventIdxStart + ORDER BY obj.eventTime DESC, obj.eventIndex DESC + </query> + </named-query> +</entity-mappings> diff --git a/repository/src/main/resources/META-INF/atlas-persistence.xml b/repository/src/main/resources/META-INF/atlas-persistence.xml new file mode 100644 index 000000000..6c1e2fb56 --- /dev/null +++ b/repository/src/main/resources/META-INF/atlas-persistence.xml @@ -0,0 +1,30 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<persistence version="2.0" xmlns="http://java.sun.com/xml/ns/persistence" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd"> + <persistence-unit name="atlasPU"> + <mapping-file>META-INF/atlas-jpa_named_queries.xml</mapping-file> + + <class>org.apache.atlas.repository.audit.rdbms.entity.DbEntityAudit</class> + <shared-cache-mode>NONE</shared-cache-mode> + + <properties> + <property name="eclipselink.logging.level" value="WARNING"/> + <property name="eclipselink.jdbc.batch-writing" value="jdbc"/> + </properties> + </persistence-unit> +</persistence> diff --git a/repository/src/main/resources/META-INF/postgres/create_schema.sql b/repository/src/main/resources/META-INF/postgres/create_schema.sql new file mode 100644 index 000000000..947ecb25c --- /dev/null +++ b/repository/src/main/resources/META-INF/postgres/create_schema.sql @@ -0,0 +1,22 @@ +-- Licensed to the Apache Software Foundation(ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +--(the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, softwaren +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +CREATE SEQUENCE IF NOT EXISTS atlas_entity_audit_seq CACHE 1000; + +CREATE TABLE IF NOT EXISTS atlas_entity_audit(id BIGINT DEFAULT nextval('atlas_entity_audit_seq'::regclass), entity_id VARCHAR(64) NOT NULL, event_time BIGINT NOT NULL, event_idx INT NOT NULL, user_name VARCHAR(64) NOT NULL, operation INT NOT NULL, details TEXT DEFAULT NULL, entity TEXT DEFAULT NULL, audit_type INT NOT NULL, PRIMARY KEY(id)); +CREATE INDEX IF NOT EXISTS atlas_entity_audit_idx_entity_id ON atlas_entity_audit (entity_id); +CREATE INDEX IF NOT EXISTS atlas_entity_audit_idx_event_time ON atlas_entity_audit (event_time); +CREATE INDEX IF NOT EXISTS atlas_entity_audit_idx_user_name ON atlas_entity_audit (user_name); +CREATE INDEX IF NOT EXISTS atlas_entity_audit_idx_entity_id_event_time ON atlas_entity_audit (entity_id, event_time); diff --git a/repository/src/main/resources/META-INF/postgres/create_sequences.sql b/repository/src/main/resources/META-INF/postgres/create_sequences.sql new file mode 100644 index 000000000..c79986fef --- /dev/null +++ b/repository/src/main/resources/META-INF/postgres/create_sequences.sql @@ -0,0 +1,16 @@ +-- Licensed to the Apache Software Foundation(ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +--(the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, softwaren +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +CREATE SEQUENCE IF NOT EXISTS atlas_entity_audit_seq CACHE 1000;