This is an automated email from the ASF dual-hosted git repository. pierrejeambrun pushed a commit to branch v2-5-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 6aaf59002907cc0547ca715c0cafd91e7d4bfe05 Author: Vu Tan <[email protected]> AuthorDate: Mon Mar 13 22:58:02 2023 +0900 Fix db clean command for mysql db (#29999) (cherry picked from commit 78cc2e89e5d46738664b7442dc6f5a00b23d1ef5) --- airflow/utils/db_cleanup.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py index e03d1a28b0..8149ad6e16 100644 --- a/airflow/utils/db_cleanup.py +++ b/airflow/utils/db_cleanup.py @@ -133,9 +133,21 @@ def _do_delete(*, query, orm_model, skip_archive, session): timestamp_str = re.sub(r"[^\d]", "", datetime.utcnow().isoformat())[:14] target_table_name = f"_airflow_deleted__{orm_model.name}__{timestamp_str}" print(f"Moving data to table {target_table_name}") - stmt = CreateTableAs(target_table_name, query.selectable) - logger.debug("ctas query:\n%s", stmt.compile()) - session.execute(stmt) + bind = session.get_bind() + dialect_name = bind.dialect.name + if dialect_name == "mysql": + # MySQL with replication needs this split into two queries, so just do it for all MySQL + # ERROR 1786 (HY000): Statement violates GTID consistency: CREATE TABLE ... SELECT. + session.execute(f"CREATE TABLE {target_table_name} LIKE {orm_model.name}") + metadata = reflect_tables([target_table_name], session) + target_table = metadata.tables[target_table_name] + insert_stm = target_table.insert().from_select(target_table.c, query) + logger.debug("insert statement:\n%s", insert_stm.compile()) + session.execute(insert_stm) + else: + stmt = CreateTableAs(target_table_name, query.selectable) + logger.debug("ctas query:\n%s", stmt.compile()) + session.execute(stmt) session.commit() # delete the rows from the old table @@ -143,8 +155,6 @@ def _do_delete(*, query, orm_model, skip_archive, session): source_table = metadata.tables[orm_model.name] target_table = metadata.tables[target_table_name] logger.debug("rows moved; purging from %s", source_table.name) - bind = session.get_bind() - dialect_name = bind.dialect.name if dialect_name == "sqlite": pk_cols = source_table.primary_key.columns delete = source_table.delete().where(
