Yappli Tech Blog

株式会社ヤプリの開発メンバーによるブログです。最新の技術情報からチーム・働き方に関するテーマまで、日々の熱い想いを持って発信していきます。

Aurora MySQL 3 アップグレード時の検証まわりのおはなし

SREチームの中原です。
今回は Aurora MySQL 2 から 3 へアップグレードした際の検証まわりについて書いてみたいと思います。

経緯

弊社本番DBで利用している Aurora MySQL 2 の EOL にともない Aurora MySQL 3 へアップグレードを実施することになりました。

前回の Aurora MySQL 1 から 2 のアップグレードの時はこちらの資料に書いたように、 けっこう大変だったアップグレード作業が、現在は AWS から Blue/Green Deployment 機能が提供されたため切り替えそのものはかなり簡単になっています。 ただし、今回のアップグレードはパフォーマンス影響あるということで、いろいろ検証を行いながらアップグレードをすすめていきました。

完全なクエリーログの取得

ジェネラルログやスロークエリーログに記録されたクエリーを分析に使いたいところですが、 こちらの記事にも書いたように ログ取得のやり方を考えないとサイズ制限にひっかかってクエリー全体を取得できないことがあります。 弊社の場合、長大な IN 句のクエリーがひっかかりました…

また、以前は cloudwatch logs 経由で取得していましたが、 レートリミットにひっかかるのが常態化してしまったため、 今回は downloadCompleteDBLogFile API を利用するようにしました。

downloadCompleteDBLogFile API によるダウンロードは AWS CLI ではサポートされていないため、 suzuki-navi氏によるこちらの記事の python スクリプトを参考にさせていただいて、 弊社のでかいクエリー達でもそれなりのメモリ消費でダウンロードできるように一定のチャンクサイズで処理するように変更したスクリプトを使いました。

#!/usr/bin/env python3

import boto3
from botocore.awsrequest import AWSRequest
import botocore.auth as auth
import requests
import sys

args = sys.argv
if 2 >= len(args):
    print('usage: '+args[0]+' db_instance_name log_file_path')
    sys.exit()

instance_id = args[1]
target_file = args[2]

region = 'ap-northeast-1'
remote_host = 'rds.' + region + '.amazonaws.com'
url = 'https://' + remote_host + '/v13/downloadCompleteLogFile/' + instance_id + '/' + target_file

session = boto3.session.Session()
credentials = session.get_credentials()
sigv4auth = auth.SigV4Auth(credentials, 'rds', region)

awsreq = AWSRequest(method = 'GET', url = url)
sigv4auth.add_auth(awsreq)

res = requests.get(url, stream=True, headers={
        'Authorization': awsreq.headers['Authorization'],
        'X-Amz-Date': awsreq.context['timestamp'],
        'X-Amz-Security-Token': credentials.token
    })

if (res.status_code < 200 or res.status_code >= 300):
    print('http status-error : ' + str(res.status_code));
    res.close()
    sys.exit()

chunk_size=1048576
for chunk in res.iter_content(chunk_size=chunk_size, decode_unicode=False):
    if chunk:
        sys.stdout.buffer.write(chunk)

ジェネラルログを使った動作検証

弊社では毎週リリース前に検証環境で一通りの機能を試すQAテストが実施されているため、 そのタイミングのジェネラルログを使って新旧 Aurora MySQL のレスポンスの差分をチェックしました。

ジェネラルログに対応したツールはいろいろ出ているので、既成のツールでいけるだろと思ったましたが、 自分が探せた範囲ではいずれのツールもパフォーマンス検証用途で、 レスポンスの差分をチェックしてくれるものは見つけられず、 自前の python をシェルスクリプトでまわしてなんとかすることになりました。

手順としては以下のようになります。

  1. QAテスト期間中のジェネラルログを取得
  2. スクリプトでジェネラルログのクエリーをコネクション/スレッド単位で個別のファイルに抽出
  3. QAテスト開始直前のスナップショットから新旧両バージョンの Aurora MySQL を作成
  4. 抽出したクエリーを同じ順番で両方に流してレスポンスを記録
  5. レスポンスに差分が出たクエリーをスプレッドシートにまとめる

あとは結果をサーバサイドエンジニアに共有して影響の調査や対応などをすすめていきました。

#!/usr/bin/env python3

import sys
import os
import re
from datetime import date, datetime

## 関数
# general.log の読み込み
def parse_general_log(log_file_path):
    queries = []
    current_query = None

    with open(log_file_path, 'r') as file:
        for line in file:
            # タイムスタンプのある行を検出
            timestamp_match = re.match(r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d+Z)(\d+)\s([\w ]+)\t(.*)', line)
            if timestamp_match:
                if current_query:
                    queries.append(current_query)
                current_query = {
                    'timestamp': datetime.strptime(timestamp_match.group(1), '%Y-%m-%dT%H:%M:%S.%fZ'),
                    'id': timestamp_match.group(2),
                    'command': timestamp_match.group(3),
                    'argment': timestamp_match.group(4)
                }
                current_query['ut']=datetime.timestamp(current_query['timestamp'])
                continue

            # 複数行にまたがるクエリを処理
            if current_query and line.strip():
                current_query['argment'] += ' ' + line.strip()

    # 最後のクエリを追加
    if current_query:
        queries.append(current_query)

    return queries


# 監視用クエリー抽出用パターン
monitoring_queries_regex = "(SELECT durable_lsn, current_read_point, server_id, last_update_timestamp FROM information_schema\.replica_host_status;|INSERT INTO mysql\.rds_heartbeat2\(id, value\) values \(\d+,\d+\) ON DUPLICATE KEY UPDATE value = \d+|PURGE BINARY LOGS BEFORE.*|SHOW WARNINGS|SELECT @@aurora_oom_response|SELECT 1|SELECT SPECIFIC_NAME, ROUTINE_TYPE, DTD_IDENTIFIER, IS_DETERMINISTIC, SQL_DATA_ACCESS, SECURITY_TYPE, DEFINER FROM `information_schema`\.`ROUTINES` WHERE `ROUTINE_SCHEMA` = \'audience\')"
monitoring_queries_pattern=re.compile(monitoring_queries_regex)

## メイン
# 引数で渡された general.log ファイルを処理 
args = sys.argv
args.pop(0)
for log_file in args:

    # general.log を読み込む
    general_log_queries = []
    general_log_queries = parse_general_log(log_file)

    # 変数初期化
    threads = {}
    monitoring_thread_ids = []
    monitoring_ids = set()

    # スレッド用リストを初期化
    ids=set(dict.fromkeys([query.get('id') for query in general_log_queries]))
    for id in ids:
        threads[id]={}
        threads[id]['queries']=[]
        threads[id]['ut']=None

    # クエリーをスレッド単位に再構成
    for query in general_log_queries:
        # Connect コマンドから接続ユーザ、ホスト、unixtime をスレッドに設定
        if query['command'] == "Connect":
            arg_match = re.match(r'(\w+)@(\S+)\s', query['argment'])
            threads[query['id']]['user']=arg_match.group(1)
            threads[query['id']]['host']=arg_match.group(2)
            threads[query['id']]['ut']=query['ut']
            if threads[query['id']]['user'] == "rdsadmin":
                monitoring_ids.add(query['id'])
        # 監視用クエリーの場合はスレッドをマーク
        elif query['command'] == "Query" and monitoring_queries_pattern.match(query['argment']):
            monitoring_ids.add(query['id'])
        # Query, Execute, Quit コマンドの場合はスレッドにクエリーを追加
        elif query['command'] == "Query" or query['command'] == "Execute" or query['command'] == "Quit":
            threads[query['id']]['queries'].append({'ts': query['timestamp'], 'ut': query['ut'], 'command': query['command'],'arg': query['argment']})

    # 監視用クエリーを含むスレッドを削除
    for id in monitoring_ids:
        del threads[id]

    # スレッドの ut が無い場合はクエリーの最初の時間をスレッドの ut に設定
    for id in threads.keys():
        if threads[id]['ut'] == None and len(threads[id]['queries']) > 0:
            threads[id]['ut'] = min([query.get('ut') for query in threads[id]['queries']],default=None)

    # スレッドごとにクエリーをファイルに保存
    for id in threads.keys():
        if len(threads[id]['queries']) > 0:
            fp = open(str(threads[id]['ut'])+"_"+str(id)+".sql",'w')
            fp.writelines(';\n'.join([query.get('arg') for query in threads[id]['queries']])+";\n")
            fp.flush()
            fp.close

    # 結果の表示
    #ids = [query.get('id') for query in general_log_queries]
    #print(ids)

    continue

スロークエリーログを使ったパフォーマンス検証

検証環境ではデータサイズ的にパフォーマンス検証は行えないため、 本番環境で発生したスロークエリーを使った検証も行いました。

こちらも Percona Toolkit の pt-query-digest などの既成のツールでいけると思っていたのですが、 弊社のクエリーでは一部正規化がうまくいかなかったり、長大な IN 句の問題などがあったため、 そのままでは使えるものが無さそうでした。 そこで、外道父氏のこちらの記事を参考にさせていただいて、 スロークエリーログを解析して分析用のメタ情報を JSONL 形式で出力する python スクリプトを用意しました。 また、JSON 化については宗定洋平氏のこちらの記事を参考にさせていただきました。

個々のクエリーを正規化して、クエリーのパターンをまんべんなく拾ってきてパフォーマンステストの対象クエリーを決定します。 単純にクエリー単位で実行時間を比較したい都合で、同じ条件でシーケンシャルに処理させる必要があるため、かなりクエリーを絞り込まないと時間がかかりすぎるためです。 (同じパターンのクエリーが大量発生していたら切り捨てる)

手順は以下のようになります。

  1. 負荷の高かった期間のスロークエリーログを取得
  2. スクリプトでスロークエリーログのメタ情報の JSON と素のスロークエリーを抽出
  3. メタ情報の JSON を BigQuery に取り込んでパフォーマンステスト対象のクエリーを絞り込む
  4. 本番DBのスナップショットから新旧両バージョンの Aurora MySQL を作成
  5. 対象のクエリーを同じ順番で両方に流してレスポンスや実行時間を記録
  6. 集計して実行時間の差分や頻度の高いクエリーの洗い出し

あとはこちらも結果をサーバサイドエンジニアに共有して影響の調査や対応などをすすめていきました。

#!/usr/bin/env python3
#
# mysql slowquery log convert to meta data json
#
# based:
#   https://blog.father.gedow.net/2021/08/08/mysql-slow-query-python/
#   https://gist.github.com/GedowFather/29ab69b59f5c925965fd8caf42327e30
#
# usage:
#   parse-slowquery.py db_cluster_name db_instance_name db_role db_tier slowquery_log_path
#

import os
import textwrap
from datetime import date, datetime
import sys
import json
import re
import hashlib

## 関数
# スロークエリーログを解析
class parserSlowQuery:
    init_keep_info = {
        'unix_time'     : None,
        'src_ip'        : None,
        'query_time'    : None,
        'lock_time'     : None,
        'rows_sent'     : None,
        'rows_examined' : None,
        'query'         : "",
        'unique_query'  : "",
        'query_hash'    : "",
        'timestamp'     : None,
    }

    def __init__(self, cluster, host, role, tier):
        self.cluster           = cluster
        self.host              = host
        self.role              = role
        self.tier              = tier
        self.queries           = {}
        self.keep_info         = {}
        self.keep_info.update(self.init_keep_info)
        self.next_is_query     = False
        self.selected_database = None
        self.max_query_size    = 10485760          # JSON に出力するクエリーの最大サイズ (bigquery の制限回避)
        self.max_file_size     = 4*1024*1024*1024  # 出力する JSON ファイルの最大サイズ (bigquery の制限回避)
        self.encode            = "Latin-1"
        return

    # スロークエリーログを読み込んで個々のスロークエリーを抽出
    def convertLog(self, logfile):
        self.out_path = os.path.dirname(logfile)
        self.out_ts   = re.search('(\d+\-\d+\-\d+\.\d+).log', logfile).group(1)
        self.out_seq  = 0
        self.out_size = 0
        with open(logfile, mode='r', encoding=self.encode, errors='ignore') as file:
            for line in file:
                if re.match(r"^(/rdsdbbin|Tcp|Time)", line):
                    continue
                elif re.match("# Th", line):
                    continue
                elif re.match("# F", line):
                    continue
                elif re.match("# Time", line):
                    self.next_is_query = False
                    self.keep_info.update(self.init_keep_info)
                elif re.match("# User", line):
                    match = re.match(r".*@.*\[([0-9.]+)\].*", line)
                    if not match: continue
                    src_ip = match.group(1)
                    self.keep_info['src_ip'] = src_ip
                elif re.match("# Query", line):
                    regex = r"^.*Query_time: *([0-9.]+) *Lock_time: *([0-9.]+) *Rows_sent: *([0-9]+) *Rows_examined: *([0-9]+).*$"
                    match = re.match(regex, line, flags=re.IGNORECASE)
                    if not match: continue
                    query_time    = match.group(1)
                    lock_time     = match.group(2)
                    rows_sent     = match.group(3)
                    rows_examined = match.group(3)
                    self.keep_info['query_time']    = float(query_time)
                    self.keep_info['lock_time']     = float(lock_time)
                    self.keep_info['rows_sent']     = int(rows_sent)
                    self.keep_info['rows_examined'] = int(rows_examined)
                elif re.match(r"^use ", line, flags=re.IGNORECASE):
                    match = re.match(r"^use +([^;]+);?$", line, flags=re.IGNORECASE)
                    if not match: continue
                    self.selected_database = match.group(1)
                elif re.match("SET time", line, flags=re.IGNORECASE):
                    if self.keep_info['query_time'] is None:
                        self.next_is_query = False
                        self.keep_info.update(self.init_keep_info)
                        print("Not found comment out information")
                    else:
                        match = re.match(r"^SET timestamp=([0-9]+);?$", line, flags=re.IGNORECASE)
                        if not match: continue
                        self.next_is_query = True
                        self.keep_info['unix_time'] = int(match.group(1))
                elif self.next_is_query is True:
                    self.keep_info['query'] += line
                    if not line: continue
                    if line[-1] == ";" or line[-2] == ";":
                        self.keep_info['unique_query'] = self.uniqueQueryPT(self.keep_info['query'])
                        self.keep_info['query_hash']   = hashlib.md5(self.keep_info['unique_query'].encode(self.encode)).hexdigest()[0:8]
                        self.keep_info['timestamp']    = datetime.fromtimestamp(self.keep_info['unix_time']).strftime('%Y-%m-%d %H:%M:%S')
                        if self.keep_info['query_hash'] not in self.queries:
                            self.queries[self.keep_info['query_hash']] = { 'count' : 0 }
                        self.queries[self.keep_info['query_hash']]['count'] += 1
                        self.printInfo()                        
                        self.printFull()
                        self.keep_info.update(self.init_keep_info)
                    else:
                        self.keep_info['query'] += " "
                else:
                    print("Found invalid line (%s)" % line)
                    print("Keep info is")
                    print(self.keep_info)
        return
        
    # スロークエリーの情報を JSON 形式で出力
    def printInfo(self):
        if not (self.keep_info['unix_time'] and self.keep_info['query_time']
                and self.keep_info['lock_time'] ) and self.keep_info['rows_examined'] is None:
            print("Error: insufficient parameter slowquery", file=sys.stderr)
            return
        
        query_info = {
            'unix_time'         : self.keep_info['unix_time'],
            'timestamp'         : self.keep_info['timestamp'],
            'unique_query_hash' : self.keep_info['query_hash'],
            'count_in_hour'     : self.queries[self.keep_info['query_hash']]['count'],
            'db_cluster'        : self.cluster,
            'db_instance'       : self.host,
            'db_role'           : self.role,
            'db_tier'           : self.tier,
            'src_ip'            : self.keep_info['src_ip'],
            'query_time'        : self.keep_info['query_time'],
            'lock_time'         : self.keep_info['lock_time'],
            'rows_sent'         : self.keep_info['rows_sent'],
            'rows_examined'     : self.keep_info['rows_examined'],
            'database'          : self.selected_database,
            'unique_query'      : self.keep_info['unique_query'],
            'unique_query_size' : len(self.keep_info['unique_query']),
            'query'             : self.keep_info['query'][0:self.max_query_size], # limit queruy size
            'query_size'        : len(self.keep_info['query']),
        }
        
        # ファイルサイズがでかくなったらファイル名の連番をインクリメント
        query_info_json = json.dumps(query_info)
        query_info_size = len(query_info_json)
        if (self.out_size + query_info_size) > self.max_file_size:
            self.out_seq += 1
            self.out_size = 0
        else:
            self.out_size += query_info_size

        with open(self.out_path+"/"+self.host+"."+self.out_ts+"."+str(self.out_seq)+".json", mode='a') as out_fd:
            print(query_info_json, file=out_fd)
            
        return

    # スロークエリー全文を出力
    def printFull(self):
        with open(self.out_path+"/"+self.host+"."+str(self.keep_info['unix_time'])+"."+self.keep_info['query_hash']+"."+str(self.queries[self.keep_info['query_hash']]['count'])+".sql", mode='w') as out_fd:
            print(self.keep_info['query'], file=out_fd)
        return
    
    # スロークエリーを正規化
    def uniqueQueryPT(self, query):
        # IN 句で ' BINARY "1001016868768",' とかいったのが延々ならんでいる長大なクエリーで、正規表現だと重すぎて処理できなかったので先にシュリンク
        if  len(query) > 262144:
            query2 = re.sub(r',',r',\n',query)
            query = ''
            for line in query2.splitlines():
                if not re.match(r' BINARY "\d+",', line):
                    query += line
            query = re.sub(r'\n','',query)

        # percona toolkit の fingerprint 関数をベースに perl から python に書き換え
        # https://github.com/percona/percona-toolkit/blob/b6dff19d972d2a47aaac575d8a9d5572b482d0a3/bin/pt-fingerprint#L1645-L1713    
        # remove ;
        #query = query.rstrip(';')
        query = re.sub(r';$', '', query)

        # remove comments
        query = re.sub(r'/\*[^!].*?\*/', '', query)
        query = re.sub(r'(?:--|#)[^\'"\\r\\n]*(?=[\r\n]|\Z)', '', query)

        # normalize whitespace
        query = re.sub(r'([^\\])(\\\')', r'\1', query)
        query = re.sub(r'([^\\])(\\\")', r'\1', query)
        query = query.replace('\\\\', '')
        query = query.replace("\\'", '')
        query = query.replace('\\"', '')
        query = re.sub(r'([^\\])(".*?[^\\]?")', r'\1?', query)
        query = re.sub(r'([^\\])(\'.*?[^\\]?\')', r'\1?', query)
        query = re.sub(r'\bfalse\b|\btrue\b', '?', query, flags=re.I)

        query = re.sub(r'([._-])[a-f0-9]{32}', r'\1?', query)
        query = re.sub(r'[0-9+-][0-9a-f.xb+-]*', '?', query)
        query = re.sub(r'[xb+-]\?', '?', query)

        query = query.lstrip()
        query = query.rstrip()
        query = re.sub(r'\s+', ' ', query)
        query = query.lower()
        query = re.sub(r'\bnull\b', '?', query)
        query = re.sub(r'\b(in|values?)(?:[\s,]*\([\s?,]*\))+', r'\1(?+)', query)
        query = re.sub(r'\b(select\s.*?)(?:(\sunion(?:\sall)?)\s\1)+', r'\1 /*repeat\2*/', query)
        query = re.sub(r'\blimit \?(?:, ?\?| offset \?)?', 'limit ?', query)

        # compress in (binary ?, ... , binary ?)
        query = re.sub(r'\bin\s*\(\s*(binary\s+\?[\s?,]*)+\)', r'in (binary ?+)', query)
        
        #if re.search(r'\bORDER BY ', query, re.I):
        #    query = re.sub(r'\G(.+?)\s+ASC', r'\1', query, flags=re.I)
        query = re.sub(r'\b(.+?)\s+ASC', r'\1', query, flags=re.IGNORECASE)

        return query

# json 出力用にフォーマット変換
def json_serial(obj):
    # 日付型の場合には、文字列に変換します
    if isinstance(obj, (datetime, date)):
        return obj.isoformat()
    # 上記以外はサポート対象外.
    raise TypeError ("Type %s not serializable" % type(obj))

# json 形式でオブジェクトを出力
def jprint(obj):
    json.dump(obj, sys.stdout, default=json_serial)

    
## メイン
args = sys.argv
db_cluster  = args[1]
db_instance = args[2]
db_role     = args[3]
db_tier     = args[4]
logfile     = args[5]

parser = parserSlowQuery(db_cluster, db_instance, db_role, db_tier)
parser.convertLog(logfile)

新バージョンの reader を低ウェイトでサービスインさせて本番で検証

弊社のスロークエリーは1秒以上かかったクエリーを記録しているため、1秒未満のクエリーは記録されていません。 また、本番環境でまとまったジェネラルログを記録するのは負荷的に難しいところです。 なので、旧バージョンでは一秒未満の実行時間だけど新バージョンでは遅くなってしまうクエリーを拾いきれないことになります。

そこで Blue/Green の切替えを行う前に Route53 の加重ルーティングを使って新バージョンの reader を低ウェイトでサービスインさせ様子を見ました。 MySQL 8.0 系で無くなったクエリーキャッシュを前提としたクエリーなどが引っかかったため対応を行ってから切り替えを行いました。

あとがき

ざっくりですが、こんなかんじで Aurora MySQL 3 切り替え時の検証を行っていきました。 SELECT COUNT() が遅くなるなど先人の方々が情報を出していてくれてたおかげで、 いろいろ段取りを考えてなんとかアップグレードを行うことが出来ましたのでほんと感謝です。

あと、個人的には pt-query-digest でかゆいところに手が届かなかったところに手を入れたスロークエリーパーサがができて通常の運用でも使えそうなのが一番の成果かもしれません。