Yappli Tech Blog

株式会社ヤプリの開発メンバーによるブログです。最新の技術情報からチーム・働き方に関するテーマまで、日々の熱い想いを持って発信していきます。

dbt管理下にないテーブルをdbt macroで一括削除する

こんにちは!データサイエンス室(以下、DS室)の山本です(@__Y4M4MOTO__)です。

運用中のdbtプロジェクトに対して、dbt管理下にないテーブルを一括削除する仕組みを構築したので、その内容を共有します。

なぜやったのか?

新規にdbtプロジェクトを作成した際に出力先のBigQueryデータセットが既存のdbtプロジェクトと同じになっており、テーブルが混ざってしまったからです。

混ざってしまったテーブルの数も多く、手動で消すのも難しい状況だったため、自動で削除する仕組みを構築することにしました。

どうやったのか?

こちらの記事で紹介されている方法を採用しました。理由はシンプルで保守しやすそうだと思ったからです。

zenn.dev

この方法では、dbtマクロ(cleanupマクロ( cleanup.sql ))を使うことでdbt管理下にないテーブルの一括削除を実現しています。

使い方など詳細は前述の記事に記載されていますので、そちらをご参照ください。

導入にあたっての工夫

cleanup.sql の修正

記事で公開されていた cleanup.sql について、既存dbtプロジェクトの運用に合うよう一部修正を行いました。

修正後のcleanup.sql

-- 出典: https://www.revolt.bi/en/automatically-identify-delete-orphaned-tables-and-schemas-dbt-macro/
-- 参考: https://zenn.dev/myshmeh/articles/1bfd29aaff8624

-- MACRO DESCRIPTION
-- This macro is designed to clean up orphaned tables and schemas from your database that are no longer managed by dbt.
-- It offers flexibility to either log the drop commands for review or execute them directly.

{%- macro cleanup(objects_type, dry_run=true, tables_to_exclude=[''], delete_custom_schemas=false, schemas_to_exclude=['']) -%}

-- PARAMETERS:
-- dry_run (default: 'true'): If set to 'true', gitthe macro logs the drop commands without executing them. If set to 'false', the macro executes the drop commands.
-- tables_to_exclude: A list of table names to exclude from being dropped.
-- delete_custom_schemas (default: 'false'): If set to 'true', the macro will drop schemas that are not managed by dbt. If set to 'false', it will only drop orphaned tables.
-- schemas_to_exclude: A list of schema names to exclude from being dropped.
-- objects_type: Mandatory parameter, A list of object type to be dropped, available objects types are VIEW, BASE TABLE, MANAGED 

-- TODO: objects_type  if not defined DROP ALL object types

    {% if not objects_type %}
        {% do exceptions.raise_compiler_error('objects_type not provided') %}
    {% endif %}

    {%- set schemas_to_exclude = schemas_to_exclude + ['monitoring', 'information_schema'] -%}
    {%- set tables_to_exclude = tables_to_exclude + ['config_table','mapping_table'] -%}
    {%- if execute -%}

        -- Create empty dictionary that will contain the hierarchy of the models in dbt
        {%- set current_model_locations = {} -%}

        -- Insert the hierarchy database.schema.table in the dictionary above
        {%- for node in graph.nodes.values() | selectattr("resource_type", "in", ["model", "seed", "snapshot"]) -%}

            {%- set database_name = node.database -%}

            {%- set schema_name = node.schema.upper() -%}
            {%- set table_name = node.alias if node.alias else node.name -%}

            -- Add db name if it does not exist in the dict
            {%- if not database_name in current_model_locations -%}
                {% do current_model_locations.update({database_name: {}}) -%}
            {%- endif -%}

            -- Add schema name if it does not exist in the dict
            {%- if not schema_name in current_model_locations[database_name] -%}
                {% do current_model_locations[database_name].update({schema_name: []}) -%}
            {%- endif -%}

            -- Add the tables for the db and schema selected
            {%- do current_model_locations[database_name][schema_name].append(table_name.upper()) -%}

        {%- endfor -%}

        {{ log(current_model_locations) }}
        {%- do log(current_model_locations, True) -%}

    {%- endif -%}

    {% set target_table %}
        {{ target.project }}.{{ target.dataset }}.INFORMATION_SCHEMA.TABLES
    {% endset %}

    -- Query to retrieve the models to drop
    {%- set cleanup_query -%}

        WITH models_to_drop AS (
            {%- for database in current_model_locations.keys() -%}

                SELECT
                    CASE
                        WHEN table_type not in (
                            {%- for object_type in objects_type -%}
                            '{{ object_type|upper }}'
                            {%- if not loop.last %} , {%- endif %}
                            {%- endfor %} ) THEN NULL
                        WHEN table_type in ('BASE TABLE', 'MANAGED', 'CLONE') THEN 'TABLE'
                        WHEN table_type = 'VIEW' THEN 'VIEW'
                        ELSE NULL
                    END AS relation_type,
                    table_catalog,
                    table_schema,
                    table_name,
                    table_catalog || '.' || table_schema || '.' || table_name as relation_name
                FROM {{ target_table }}
                WHERE 1=1

                    AND LOWER(table_schema) IN ('{{ "', '".join(current_model_locations[database].keys())|lower }}')
                    AND NOT (
                        {%- for schema in current_model_locations[database].keys() -%}
                            LOWER(table_schema) = LOWER('{{ schema }}') AND LOWER(table_name) IN ('{{ "', '".join(current_model_locations[database][schema])|lower }}')
                            {% if not loop.last %} OR {% endif %}
                        {%- endfor %}
                    )


-- Exclude tables containing any of the keywords
                    AND NOT (
                        {%- for keyword in tables_to_exclude -%}
                            LOWER(table_name) IN ('{{ keyword|lower }}')
                            {% if not loop.last %} OR {% endif %}
                        {%- endfor %}
                    )


                {% if not loop.last %} UNION ALL {% endif %}

            {%- endfor -%}
        ),

        drop_schemas AS (
            SELECT DISTINCT table_catalog, table_schema
            FROM {{ target_table }}
            WHERE 1=1
            {% for database in current_model_locations.keys() -%}
            AND LOWER(table_schema) NOT IN ('{{ current_model_locations[database].keys() | join("', '") | lower }}')
            {%- endfor -%}
            AND LOWER(table_schema) NOT IN ('{{ schemas_to_exclude | join("', '") | lower }}')
-- Exclude schemas from macro settings
                AND NOT (
                    {%- for schema in schemas_to_exclude -%}
                        LOWER(table_schema) IN ('{{ schema|lower }}')
                        {% if not loop.last %} OR {% endif %}
                    {%- endfor %}
                )
        )

        -- Create the DROP statments to be executed in the database
                SELECT 'DROP ' || relation_type || ' IF EXISTS ' || '`' || table_catalog || '.' ||table_schema || '.' || table_name || '`;' AS drop_commands
                FROM models_to_drop
                WHERE relation_type IS NOT NULL

                {% if delete_custom_schemas == true %}
                UNION ALL

                SELECT 'DROP SCHEMA IF EXISTS ' || '`' || table_catalog || '.' || table_schema || '` CASCADE;' AS drop_commands
                FROM drop_schemas
                {% endif %}

    {%- endset -%}


    {{ log('\nGenerating cleanup queries.\n', info=True) }}

    -- Execute the DROP statments above
    {%- if dry_run == false -%}
        {%- set drop_commands = run_query(cleanup_query).columns[0].values() -%}
        {%- for drop_command in drop_commands -%}
            {%- do run_query(drop_command) -%}
            {%- do log(drop_command, True) -%}
        {%- endfor -%}
    {%- else -%}
        -- Log the drop commands in dry run mode
        {%- set drop_commands = run_query(cleanup_query).columns[0].values() -%}
        {%- for drop_command in drop_commands -%}
            {%- do log(drop_command, True) -%}
        {%- endfor -%}
    {%- endif -%}


{%- endmacro -%}

元のcleanup.sqlとの差分

--- macros/cleanup.sql
+++ cleanup_org.sql
@@ -30,8 +29,11 @@
         -- Insert the hierarchy database.schema.table in the dictionary above
         {%- for node in graph.nodes.values() | selectattr("resource_type", "in", ["model", "seed", "snapshot"]) -%}
 
-            {%- set database_name = node.database -%}
-
+            {% if target.type == 'bigquery' %}
+              {%- set database_name = node.database -%}
+            {%- else -%}
+              {%- set database_name = node.database.upper() -%}
+            {% endif %}
             {%- set schema_name = node.schema.upper() -%}
             {%- set table_name = node.alias if node.alias else node.name -%}
 
@@ -56,7 +58,11 @@
     {%- endif -%}
 
     {% set target_table %}
-        {{ target.project }}.{{ target.dataset }}.INFORMATION_SCHEMA.TABLES
+      {% if target.type == 'bigquery' %}
+          {{ target.project }}.{{ target.dataset }}.INFORMATION_SCHEMA.TABLES
+      {%- else -%}
+        {{ target.database }}.INFORMATION_SCHEMA.TABLES
+      {% endif %}
     {% endset %}
 
     -- Query to retrieve the models to drop
@@ -72,7 +78,7 @@
                             '{{ object_type|upper }}'
                             {%- if not loop.last %} , {%- endif %}
                             {%- endfor %} ) THEN NULL
-                        WHEN table_type in ('BASE TABLE', 'MANAGED', 'CLONE') THEN 'TABLE'
+                        WHEN table_type in ('BASE TABLE', 'MANAGED') THEN 'TABLE'
                         WHEN table_type = 'VIEW' THEN 'VIEW'
                         ELSE NULL
                     END AS relation_type,
@@ -110,7 +116,7 @@
             SELECT DISTINCT table_catalog, table_schema
             FROM {{ target_table }}
             WHERE 1=1
-            {% for database in current_model_locations.keys() -%}
+            {%- for database in current_model_locations.keys() -%}
             AND LOWER(table_schema) NOT IN ('{{ current_model_locations[database].keys() | join("', '") | lower }}')
             {%- endfor -%}
             AND LOWER(table_schema) NOT IN ('{{ schemas_to_exclude | join("', '") | lower }}')
@@ -124,14 +130,14 @@
         )
 
         -- Create the DROP statments to be executed in the database
-                SELECT 'DROP ' || relation_type || ' IF EXISTS ' || '`' || table_catalog || '.' ||table_schema || '.' || table_name || '`;' AS drop_commands
+                SELECT 'DROP ' || relation_type || ' IF EXISTS ' || table_catalog || '.' ||table_schema || '.' || table_name || ';' AS drop_commands
                 FROM models_to_drop
                 WHERE relation_type IS NOT NULL
 
                 {% if delete_custom_schemas == true %}
                 UNION ALL
 
-                SELECT 'DROP SCHEMA IF EXISTS ' || '`' || table_catalog || '.' || table_schema || '` CASCADE;' AS drop_commands
+                SELECT 'DROP SCHEMA IF EXISTS ' || table_catalog || '.' || table_schema || ' CASCADE;' AS drop_commands
                 FROM drop_schemas
                 {% endif %}
 

エラーの修正

そもそも、そのままでは下記のエラーが発生して動きませんでした。

Encountered an error while running operation: Database Error
  Syntax error: Expected ")" but got "AND" at [42:22]

cleanupマクロが実際に発行するSQLからエラーの発生している箇所を抜粋したものを以下に示します。 WHERE 1=1 の後の AND の前にスペースがないことが原因でエラーが発生していました。

〜〜(略)〜〜

            WHERE 1=1AND LOWER(table_schema) NOT IN ('YOUR_DATASET')AND LOWER(table_schema) NOT IN ('', 'monitoring', 'information_schema')

〜〜(略)〜〜

そこで、 cleanup.sql の該当箇所について、 {%- for{% for に修正して改行を入れるようにしました。

〜〜(略)〜〜

            WHERE 1=1
-           {%- for database in current_model_locations.keys() -%}
+           {% for database in current_model_locations.keys() -%}
            AND LOWER(table_schema) NOT IN ('{{ current_model_locations[database].keys() | join("', '") | lower }}')
            {%- endfor -%}
            AND LOWER(table_schema) NOT IN ('{{ schemas_to_exclude | join("', '") | lower }}')

〜〜(略)〜〜

これにより、発行されるSQLは以下のようになり、エラーが解消されました。

〜〜(略)〜〜

            WHERE 1=1
            AND LOWER(table_schema) NOT IN ('YOUR_DATASET')AND LOWER(table_schema) NOT IN ('', 'monitoring', 'information_schema')

〜〜(略)〜〜

クローンされたテーブルに対応

弊社ではdbtプロジェクトのdev環境をBigQueryのテーブルクローンを使って作成しています。

tech.yappli.io

cleanup.sql はクローンされたテーブルに対応していなかったため、削除対象に含めるように修正しました。

〜〜(略)〜〜

        WITH models_to_drop AS (
            {%- for database in current_model_locations.keys() -%}

                SELECT
                    CASE
                        WHEN table_type not in (
                            {%- for object_type in objects_type -%}
                            '{{ object_type|upper }}'
                            {%- if not loop.last %} , {%- endif %}
                            {%- endfor %} ) THEN NULL
-                       WHEN table_type in ('BASE TABLE', 'MANAGED') THEN 'TABLE'
+                       WHEN table_type in ('BASE TABLE', 'MANAGED', 'CLONE') THEN 'TABLE'

〜〜(略)〜〜

BigQueryビューの削除に対応

Google SQLの仕様で、 DROP VIEW の場合はビューのパスをバッククォートで囲まないと、Google CloudのプロジェクトIDに - が含まれている場合にSyntax errorが発生してしまいます( DROP TABLE の場合はバッククォートなしでも問題ない)。

cleanupマクロでエラーが発生しているログ:

15:19:57  DROP TABLE IF EXISTS your-project-id.your_dataset._hoge_table;
15:19:57  Encountered an error while running operation: Database Error
  Syntax error: Expected end of input but got "-" at [3:36]

Google SQLでのエラーの再現例:

DROP TABLE IF EXISTS your-project-id.your_dataset._hoge_table; -- OK

DROP VIEW IF EXISTS your-project-id.your_dataset._hoge_view;   -- NG | Syntax error: Expected end of input but got "-" 
DROP VIEW IF EXISTS `your-project-id.your_dataset._hoge_view`; -- OK

そこで、DROP文を生成する箇所について、パスをバッククォートで囲むように修正しました。

        -- Create the DROP statments to be executed in the database
-               SELECT 'DROP ' || relation_type || ' IF EXISTS ' || table_catalog || '.' ||table_schema || '.' || table_name || ';' AS drop_commands
+               SELECT 'DROP ' || relation_type || ' IF EXISTS ' || '`' || table_catalog || '.' ||table_schema || '.' || table_name || '`;' AS drop_commands
                FROM models_to_drop
                WHERE relation_type IS NOT NULL

                {% if delete_custom_schemas == true %}
                UNION ALL

-               SELECT 'DROP SCHEMA IF EXISTS ' || table_catalog || '.' || table_schema || ' CASCADE;' AS drop_commands
+               SELECT 'DROP SCHEMA IF EXISTS ' || '`' || table_catalog || '.' || table_schema || '` CASCADE;' AS drop_commands
                FROM drop_schemas
                {% endif %}

cleanupマクロはSnowflake、Databricks、BigQueryに対応した実装になっていたのですが、上記修正を加えたことで恐らくBigQuery特化の実装になった(Snowflake、Databricksの環境が無くて動作確認できずでして…🙇)ので、DWHの違いによる分岐処理の部分は分岐を削除しました。

-           {% if target.type == 'bigquery' %}
-             {%- set database_name = node.database -%}
-           {%- else -%}
-             {%- set database_name = node.database.upper() -%}
-           {% endif %}
-        {%- set schema_name = node.schema.upper() -%}
-        {%- set table_name = node.alias if node.alias else node.name -%}
+        {%- set database_name = node.database -%}
-   {% set target_table %}
-     {% if target.type == 'bigquery' %}
-         {{ target.project }}.{{ target.dataset }}.INFORMATION_SCHEMA.TABLES
-     {%- else -%}
-       {{ target.database }}.INFORMATION_SCHEMA.TABLES
-     {% endif %}
-   {% endset %}
+   {{ target.project }}.{{ target.dataset }}.INFORMATION_SCHEMA.TABLES

Makefileで開発環境へ実行可能に

弊社では開発者ごとにBigQueryデータセットを用意して、その中で好きにdbtモデルをmaterializeしてもらう運用を採っています。

tech.yappli.io

そこで、ローカルから自分のデータセットに対してcleanupマクロを実行できるようにしました。

実行はMakefileにコマンドを追加して、簡単に実行できるようにしました。

.PHONY: cleanup-dev
cleanup-dev:
  dbt run-operation cleanup --args '{objects_type: ["BASE TABLE", "MANAGED", "VIEW", "CLONE"]}' --target dev

.PHONY: cleanup-dev-execute
cleanup-dev-execute:
  dbt run-operation cleanup --args '{objects_type: ["BASE TABLE", "MANAGED", "VIEW", "CLONE"], dry_run: false}' --target dev

dbtモデルの開発を行なっていくと、開発当時は必要だったが今はもう不要なテーブル(差分チェック用テーブルなど)がデータセット内に溜まっていってしまいます。データセット内の掃除を簡単にできるようにするため、ローカルから実行できるようにしました。

GitHub Actionsでmainブランチへマージしたときに実行するように

本番のBigQueryデータセット内がdbt管理下にあるビュー・テーブルのみになるよう、GitHub Actionsでmainブランチへマージしたときに実行するようにしました。

これで、当初の「別のdbtプロジェクトのテーブルが混ざってしまった」という状況を解消するとともに、本番データセット内をクリーンな状態に保てるようになりました。

name: Cleanup Orphaned Objects

on:
  push:
    branches:
      - main
  workflow_dispatch:

jobs:
  cleanup:
    runs-on: ubuntu-latest

    steps:
    
      - 〜〜(略)〜〜

      - name: exec cleanup
        run: |
          dbt run-operation cleanup --args '{objects_type: ["BASE TABLE", "MANAGED", "VIEW", "CLONE"], dry_run: false}' --target prod

結び

本記事では、dbt管理下にないテーブルを一括削除する仕組みの構築について紹介しました。同様の問題でお困りの方の参考になれば幸いです。

今回やってみての所感としては、cleanupマクロで一括削除する方法はシンプルで使い勝手が良いなと感じました。

ここまでお読みいただきありがとうございました!


本記事を読んでヤプリのDS室に興味を持っていただけた方はぜひカジュアル面談へお越しください!

open.talentio.com