This is an automated email from the ASF dual-hosted git repository. zykkk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new e1184bf4dc [fix](dbt) dbt incremental append (#20513) e1184bf4dc is described below commit e1184bf4dc6f513e23fcbc9605d897c101e7dff6 Author: catpineapple <42031973+catpineap...@users.noreply.github.com> AuthorDate: Fri Jun 9 01:41:33 2023 +0800 [fix](dbt) dbt incremental append (#20513) --- .../dbt-doris/dbt/adapters/doris/__version__.py | 2 +- .../dbt-doris/dbt/include/doris/dbt_project.yml | 2 +- .../dbt/include/doris/macros/adapters/relation.sql | 2 +- .../macros/materializations/incremental/help.sql | 7 +- .../materializations/incremental/incremental.sql | 101 +++++++++++++-------- extension/dbt-doris/setup.py | 5 +- 6 files changed, 76 insertions(+), 43 deletions(-) diff --git a/extension/dbt-doris/dbt/adapters/doris/__version__.py b/extension/dbt-doris/dbt/adapters/doris/__version__.py index e7da78ed97..201fc2407f 100644 --- a/extension/dbt-doris/dbt/adapters/doris/__version__.py +++ b/extension/dbt-doris/dbt/adapters/doris/__version__.py @@ -22,4 +22,4 @@ # this 'version' must be set !!! # otherwise the adapters will not be found after the 'dbt init xxx' command -version = "1.3.0" \ No newline at end of file +version = "0.2.1" diff --git a/extension/dbt-doris/dbt/include/doris/dbt_project.yml b/extension/dbt-doris/dbt/include/doris/dbt_project.yml index a0518da8df..1cd7e916a8 100644 --- a/extension/dbt-doris/dbt/include/doris/dbt_project.yml +++ b/extension/dbt-doris/dbt/include/doris/dbt_project.yml @@ -19,7 +19,7 @@ # under the License. name: dbt_doris -version: 1.3.0 +version: 0.2.1 config-version: 2 macro-paths: ["macros"] diff --git a/extension/dbt-doris/dbt/include/doris/macros/adapters/relation.sql b/extension/dbt-doris/dbt/include/doris/macros/adapters/relation.sql index b84e5d529e..c60201c51b 100644 --- a/extension/dbt-doris/dbt/include/doris/macros/adapters/relation.sql +++ b/extension/dbt-doris/dbt/include/doris/macros/adapters/relation.sql @@ -81,7 +81,7 @@ {%- endmacro %} {% macro doris__properties() -%} - {% set properties = config.get('properties', validator=validation.any[dict]) or {"replication_num":"1"} %} + {% set properties = config.get('properties', validator=validation.any[dict]) %} {% if properties is not none %} PROPERTIES ( {% for key, value in properties.items() %} diff --git a/extension/dbt-doris/dbt/include/doris/macros/materializations/incremental/help.sql b/extension/dbt-doris/dbt/include/doris/macros/materializations/incremental/help.sql index 01dd9e5ffb..be8af0bb23 100644 --- a/extension/dbt-doris/dbt/include/doris/macros/materializations/incremental/help.sql +++ b/extension/dbt-doris/dbt/include/doris/macros/materializations/incremental/help.sql @@ -56,7 +56,12 @@ show create table {{ target_relation }} {%- endmacro %} -{% macro is_unique_model( table_create_obj ) %} +{% macro is_unique_model( target_relation ) %} + {% set build_show_create = show_create( target_relation, statement_name='table_model') %} + {% call statement('table_model' , fetch_result=True) %} + {{ build_show_create }} + {% endcall %} + {%- set table_create_obj = load_result('table_model') -%} {% set create_table = table_create_obj['data'][0][1]%} {{ return('\nUNIQUE KEY(' in create_table and '\nDUPLICATE KEY(' not in create_table and '\nAGGREGATE KEY(' not in create_table) }} {%- endmacro %} diff --git a/extension/dbt-doris/dbt/include/doris/macros/materializations/incremental/incremental.sql b/extension/dbt-doris/dbt/include/doris/macros/materializations/incremental/incremental.sql index 94f06444de..f15a9d0896 100644 --- a/extension/dbt-doris/dbt/include/doris/macros/materializations/incremental/incremental.sql +++ b/extension/dbt-doris/dbt/include/doris/macros/materializations/incremental/incremental.sql @@ -17,56 +17,71 @@ {% materialization incremental, adapter='doris' %} {% set unique_key = config.get('unique_key', validator=validation.any[list]) %} - {%- set inserts_only = config.get('inserts_only') -%} - + {% set strategy = dbt_doris_validate_get_incremental_strategy(config) %} + {% set full_refresh_mode = (should_full_refresh()) %} {% set target_relation = this.incorporate(type='table') %} - - {% set existing_relation = load_relation(this) %} {% set tmp_relation = make_temp_relation(this) %} - {{ run_hooks(pre_hooks, inside_transaction=False) }} - {{ run_hooks(pre_hooks, inside_transaction=True) }} - {% set to_drop = [] %} + {#-- append or no unique key --#} - {% if unique_key is none or inserts_only %} - {% set build_sql = tmp_insert(tmp_relation, target_relation, unique_key=none) %} - {% elif existing_relation is none %} - {% set build_sql = doris__create_unique_table_as(False, target_relation, sql) %} - {% elif existing_relation.is_view or should_full_refresh() %} - {#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #} - {% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %} - {% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %} - {% do adapter.drop_relation(backup_relation) %} - {% do adapter.rename_relation(target_relation, backup_relation) %} - {% set build_sql = doris__create_unique_table_as(False, target_relation, sql) %} - {% do to_drop.append(backup_relation) %} - {% else %} - {% set build_show_create = show_create( target_relation, statement_name="table_model") %} - {% call statement('table_model' , fetch_result=True) %} - {{ build_show_create }} - {% endcall %} - {%- set table_create_obj = load_result('table_model') -%} - {% if not is_unique_model(table_create_obj) %} - {% do exceptions.raise_compiler_error("doris table:"~ target_relation ~ ", model must be 'UNIQUE'" ) %} - {% endif %} - {% do run_query(create_table_as(True, tmp_relation, sql)) %} - {% do to_drop.append(tmp_relation) %} - {% do adapter.expand_target_column_types( - from_relation=tmp_relation, - to_relation=target_relation) %} - {% set build_sql = tmp_insert(tmp_relation, target_relation, unique_key=unique_key) %} + {% if unique_key is none or strategy == 'append' %} + {#-- create table first --#} + {% if existing_relation is none %} + {% set build_sql = doris__create_table_as(False, target_relation, sql) %} + {% elif existing_relation.is_view or full_refresh_mode %} + {#-- backup data before drop old table #} + {% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %} + {% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %} + {% do adapter.drop_relation(backup_relation) %} {#-- likes 'drop table if exists ... ' --#} + {% do adapter.rename_relation(target_relation, backup_relation) %} + {% set build_sql = doris__create_table_as(False, target_relation, sql) %} + {% do to_drop.append(backup_relation) %} + {#-- append data --#} + {% else %} + {% do run_query(create_table_as(True, tmp_relation, sql)) %} + {% set build_sql = tmp_insert(tmp_relation, target_relation, unique_key=none) %} + {% endif %} + {#-- insert overwrite --#} + {% elif strategy == 'insert_overwrite' %} + {#-- create table first --#} + {% if existing_relation is none %} + {% set build_sql = doris__create_unique_table_as(False, target_relation, sql) %} + {#-- insert data refresh --#} + {% elif existing_relation.is_view or full_refresh_mode %} + {#-- backup data before drop old table #} + {% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %} + {% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %} + {% do adapter.drop_relation(backup_relation) %} {#-- likes 'drop table if exists ... ' --#} + {% do adapter.rename_relation(target_relation, backup_relation) %} + {% set build_sql = doris__create_unique_table_as(False, target_relation, sql) %} + {% do to_drop.append(backup_relation) %} + {#-- append data --#} + {% else %} + {#-- check doris unique table --#} + {% if not is_unique_model(target_relation) %} + {% do exceptions.raise_compiler_error("doris table:"~ target_relation ~ ", model must be 'UNIQUE'" ) %} + {% endif %} + {#-- create temp duplicate table for this incremental task --#} + {% do run_query(create_table_as(True, tmp_relation, sql)) %} + {% do to_drop.append(tmp_relation) %} + {% do adapter.expand_target_column_types( + from_relation=tmp_relation, + to_relation=target_relation) %} + {% set build_sql = tmp_insert(tmp_relation, target_relation, unique_key=unique_key) %} + {% endif %} + {% else %} + {#-- never --#} {% endif %} {% call statement("main") %} {{ build_sql }} {% endcall %} - - {% do persist_docs(target_relation, model) %} + {#-- {% do persist_docs(target_relation, model) %} #} {{ run_hooks(post_hooks, inside_transaction=True) }} {% do adapter.commit() %} {% for rel in to_drop %} @@ -74,5 +89,17 @@ {% endfor %} {{ run_hooks(post_hooks, inside_transaction=False) }} {{ return({'relations': [target_relation]}) }} - {%- endmaterialization %} + +{% macro dbt_doris_validate_get_incremental_strategy(config) %} + {#-- Find and validate the incremental strategy #} + {%- set strategy = config.get('incremental_strategy') or 'insert_overwrite' -%} + {% set invalid_strategy_msg -%} + Invalid incremental strategy provided: {{ strategy }} + Expected one of: 'append', 'insert_overwrite' + {%- endset %} + {% if strategy not in ['append', 'insert_overwrite'] %} + {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} + {% endif %} + {% do return (strategy) %} +{% endmacro %} diff --git a/extension/dbt-doris/setup.py b/extension/dbt-doris/setup.py index 41658c2624..0f515fe953 100644 --- a/extension/dbt-doris/setup.py +++ b/extension/dbt-doris/setup.py @@ -22,7 +22,7 @@ from setuptools import find_namespace_packages, setup package_name = "dbt-doris" # make sure this always matches dbt/adapters/{adapter}/__version__.py -package_version = "1.3.0" +package_version = "0.2.1" dbt_core_version = "1.3.0" description = """The doris adapter plugin for dbt """ @@ -39,6 +39,7 @@ setup( install_requires=[ "dbt-core~={}".format(dbt_core_version), "mysql-connector-python>=8.0.0,<8.1", + "urllib3~=1.0", ], - python_requires=">=3.8,<=3.10", + python_requires=">=3.7.2", ) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org