SREチームの中原です。 今回は Aurora MySQL 2 から 3 へアップグレードした際の検証まわりについて書いてみたいと思います。
経緯
弊社本番DBで利用している Aurora MySQL 2 の EOL にともない Aurora MySQL 3 へアップグレードを実施することになりました。
前回の Aurora MySQL 1 から 2 のアップグレードの時はこちらの資料に書いたように、 けっこう大変だったアップグレード作業が、現在は AWS から Blue/Green Deployment 機能が提供されたため切り替えそのものはかなり簡単になっています。 ただし、今回のアップグレードはパフォーマンス影響あるということで、いろいろ検証を行いながらアップグレードをすすめていきました。
完全なクエリーログの取得
ジェネラルログやスロークエリーログに記録されたクエリーを分析に使いたいところですが、 こちらの記事にも書いたように ログ取得のやり方を考えないとサイズ制限にひっかかってクエリー全体を取得できないことがあります。 弊社の場合、長大な IN 句のクエリーがひっかかりました…
また、以前は cloudwatch logs 経由で取得していましたが、 レートリミットにひっかかるのが常態化してしまったため、 今回は downloadCompleteDBLogFile API を利用するようにしました。
downloadCompleteDBLogFile API によるダウンロードは AWS CLI ではサポートされていないため、 suzuki-navi氏によるこちらの記事の python スクリプトを参考にさせていただいて、 弊社のでかいクエリー達でもそれなりのメモリ消費でダウンロードできるように一定のチャンクサイズで処理するように変更したスクリプトを使いました。
#!/usr/bin/env python3 import boto3 from botocore.awsrequest import AWSRequest import botocore.auth as auth import requests import sys args = sys.argv if 2 >= len(args): print('usage: '+args[0]+' db_instance_name log_file_path') sys.exit() instance_id = args[1] target_file = args[2] region = 'ap-northeast-1' remote_host = 'rds.' + region + '.amazonaws.com' url = 'https://' + remote_host + '/v13/downloadCompleteLogFile/' + instance_id + '/' + target_file session = boto3.session.Session() credentials = session.get_credentials() sigv4auth = auth.SigV4Auth(credentials, 'rds', region) awsreq = AWSRequest(method = 'GET', url = url) sigv4auth.add_auth(awsreq) res = requests.get(url, stream=True, headers={ 'Authorization': awsreq.headers['Authorization'], 'X-Amz-Date': awsreq.context['timestamp'], 'X-Amz-Security-Token': credentials.token }) if (res.status_code < 200 or res.status_code >= 300): print('http status-error : ' + str(res.status_code)); res.close() sys.exit() chunk_size=1048576 for chunk in res.iter_content(chunk_size=chunk_size, decode_unicode=False): if chunk: sys.stdout.buffer.write(chunk)
ジェネラルログを使った動作検証
弊社では毎週リリース前に検証環境で一通りの機能を試すQAテストが実施されているため、 そのタイミングのジェネラルログを使って新旧 Aurora MySQL のレスポンスの差分をチェックしました。
ジェネラルログに対応したツールはいろいろ出ているので、既成のツールでいけるだろと思ったましたが、 自分が探せた範囲ではいずれのツールもパフォーマンス検証用途で、 レスポンスの差分をチェックしてくれるものは見つけられず、 自前の python をシェルスクリプトでまわしてなんとかすることになりました。
手順としては以下のようになります。
- QAテスト期間中のジェネラルログを取得
- スクリプトでジェネラルログのクエリーをコネクション/スレッド単位で個別のファイルに抽出
- QAテスト開始直前のスナップショットから新旧両バージョンの Aurora MySQL を作成
- 抽出したクエリーを同じ順番で両方に流してレスポンスを記録
- レスポンスに差分が出たクエリーをスプレッドシートにまとめる
あとは結果をサーバサイドエンジニアに共有して影響の調査や対応などをすすめていきました。
#!/usr/bin/env python3 import sys import os import re from datetime import date, datetime ## 関数 # general.log の読み込み def parse_general_log(log_file_path): queries = [] current_query = None with open(log_file_path, 'r') as file: for line in file: # タイムスタンプのある行を検出 timestamp_match = re.match(r'(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d+Z)(\d+)\s([\w ]+)\t(.*)', line) if timestamp_match: if current_query: queries.append(current_query) current_query = { 'timestamp': datetime.strptime(timestamp_match.group(1), '%Y-%m-%dT%H:%M:%S.%fZ'), 'id': timestamp_match.group(2), 'command': timestamp_match.group(3), 'argment': timestamp_match.group(4) } current_query['ut']=datetime.timestamp(current_query['timestamp']) continue # 複数行にまたがるクエリを処理 if current_query and line.strip(): current_query['argment'] += ' ' + line.strip() # 最後のクエリを追加 if current_query: queries.append(current_query) return queries # 監視用クエリー抽出用パターン monitoring_queries_regex = "(SELECT durable_lsn, current_read_point, server_id, last_update_timestamp FROM information_schema\.replica_host_status;|INSERT INTO mysql\.rds_heartbeat2\(id, value\) values \(\d+,\d+\) ON DUPLICATE KEY UPDATE value = \d+|PURGE BINARY LOGS BEFORE.*|SHOW WARNINGS|SELECT @@aurora_oom_response|SELECT 1|SELECT SPECIFIC_NAME, ROUTINE_TYPE, DTD_IDENTIFIER, IS_DETERMINISTIC, SQL_DATA_ACCESS, SECURITY_TYPE, DEFINER FROM `information_schema`\.`ROUTINES` WHERE `ROUTINE_SCHEMA` = \'audience\')" monitoring_queries_pattern=re.compile(monitoring_queries_regex) ## メイン # 引数で渡された general.log ファイルを処理 args = sys.argv args.pop(0) for log_file in args: # general.log を読み込む general_log_queries = [] general_log_queries = parse_general_log(log_file) # 変数初期化 threads = {} monitoring_thread_ids = [] monitoring_ids = set() # スレッド用リストを初期化 ids=set(dict.fromkeys([query.get('id') for query in general_log_queries])) for id in ids: threads[id]={} threads[id]['queries']=[] threads[id]['ut']=None # クエリーをスレッド単位に再構成 for query in general_log_queries: # Connect コマンドから接続ユーザ、ホスト、unixtime をスレッドに設定 if query['command'] == "Connect": arg_match = re.match(r'(\w+)@(\S+)\s', query['argment']) threads[query['id']]['user']=arg_match.group(1) threads[query['id']]['host']=arg_match.group(2) threads[query['id']]['ut']=query['ut'] if threads[query['id']]['user'] == "rdsadmin": monitoring_ids.add(query['id']) # 監視用クエリーの場合はスレッドをマーク elif query['command'] == "Query" and monitoring_queries_pattern.match(query['argment']): monitoring_ids.add(query['id']) # Query, Execute, Quit コマンドの場合はスレッドにクエリーを追加 elif query['command'] == "Query" or query['command'] == "Execute" or query['command'] == "Quit": threads[query['id']]['queries'].append({'ts': query['timestamp'], 'ut': query['ut'], 'command': query['command'],'arg': query['argment']}) # 監視用クエリーを含むスレッドを削除 for id in monitoring_ids: del threads[id] # スレッドの ut が無い場合はクエリーの最初の時間をスレッドの ut に設定 for id in threads.keys(): if threads[id]['ut'] == None and len(threads[id]['queries']) > 0: threads[id]['ut'] = min([query.get('ut') for query in threads[id]['queries']],default=None) # スレッドごとにクエリーをファイルに保存 for id in threads.keys(): if len(threads[id]['queries']) > 0: fp = open(str(threads[id]['ut'])+"_"+str(id)+".sql",'w') fp.writelines(';\n'.join([query.get('arg') for query in threads[id]['queries']])+";\n") fp.flush() fp.close # 結果の表示 #ids = [query.get('id') for query in general_log_queries] #print(ids) continue
スロークエリーログを使ったパフォーマンス検証
検証環境ではデータサイズ的にパフォーマンス検証は行えないため、 本番環境で発生したスロークエリーを使った検証も行いました。
こちらも Percona Toolkit の pt-query-digest などの既成のツールでいけると思っていたのですが、 弊社のクエリーでは一部正規化がうまくいかなかったり、長大な IN 句の問題などがあったため、 そのままでは使えるものが無さそうでした。 そこで、外道父氏のこちらの記事を参考にさせていただいて、 スロークエリーログを解析して分析用のメタ情報を JSONL 形式で出力する python スクリプトを用意しました。 また、JSON 化については宗定洋平氏のこちらの記事を参考にさせていただきました。
個々のクエリーを正規化して、クエリーのパターンをまんべんなく拾ってきてパフォーマンステストの対象クエリーを決定します。 単純にクエリー単位で実行時間を比較したい都合で、同じ条件でシーケンシャルに処理させる必要があるため、かなりクエリーを絞り込まないと時間がかかりすぎるためです。 (同じパターンのクエリーが大量発生していたら切り捨てる)
手順は以下のようになります。
- 負荷の高かった期間のスロークエリーログを取得
- スクリプトでスロークエリーログのメタ情報の JSON と素のスロークエリーを抽出
- メタ情報の JSON を BigQuery に取り込んでパフォーマンステスト対象のクエリーを絞り込む
- 本番DBのスナップショットから新旧両バージョンの Aurora MySQL を作成
- 対象のクエリーを同じ順番で両方に流してレスポンスや実行時間を記録
- 集計して実行時間の差分や頻度の高いクエリーの洗い出し
あとはこちらも結果をサーバサイドエンジニアに共有して影響の調査や対応などをすすめていきました。
#!/usr/bin/env python3 # # mysql slowquery log convert to meta data json # # based: # https://blog.father.gedow.net/2021/08/08/mysql-slow-query-python/ # https://gist.github.com/GedowFather/29ab69b59f5c925965fd8caf42327e30 # # usage: # parse-slowquery.py db_cluster_name db_instance_name db_role db_tier slowquery_log_path # import os import textwrap from datetime import date, datetime import sys import json import re import hashlib ## 関数 # スロークエリーログを解析 class parserSlowQuery: init_keep_info = { 'unix_time' : None, 'src_ip' : None, 'query_time' : None, 'lock_time' : None, 'rows_sent' : None, 'rows_examined' : None, 'query' : "", 'unique_query' : "", 'query_hash' : "", 'timestamp' : None, } def __init__(self, cluster, host, role, tier): self.cluster = cluster self.host = host self.role = role self.tier = tier self.queries = {} self.keep_info = {} self.keep_info.update(self.init_keep_info) self.next_is_query = False self.selected_database = None self.max_query_size = 10485760 # JSON に出力するクエリーの最大サイズ (bigquery の制限回避) self.max_file_size = 4*1024*1024*1024 # 出力する JSON ファイルの最大サイズ (bigquery の制限回避) self.encode = "Latin-1" return # スロークエリーログを読み込んで個々のスロークエリーを抽出 def convertLog(self, logfile): self.out_path = os.path.dirname(logfile) self.out_ts = re.search('(\d+\-\d+\-\d+\.\d+).log', logfile).group(1) self.out_seq = 0 self.out_size = 0 with open(logfile, mode='r', encoding=self.encode, errors='ignore') as file: for line in file: if re.match(r"^(/rdsdbbin|Tcp|Time)", line): continue elif re.match("# Th", line): continue elif re.match("# F", line): continue elif re.match("# Time", line): self.next_is_query = False self.keep_info.update(self.init_keep_info) elif re.match("# User", line): match = re.match(r".*@.*\[([0-9.]+)\].*", line) if not match: continue src_ip = match.group(1) self.keep_info['src_ip'] = src_ip elif re.match("# Query", line): regex = r"^.*Query_time: *([0-9.]+) *Lock_time: *([0-9.]+) *Rows_sent: *([0-9]+) *Rows_examined: *([0-9]+).*$" match = re.match(regex, line, flags=re.IGNORECASE) if not match: continue query_time = match.group(1) lock_time = match.group(2) rows_sent = match.group(3) rows_examined = match.group(3) self.keep_info['query_time'] = float(query_time) self.keep_info['lock_time'] = float(lock_time) self.keep_info['rows_sent'] = int(rows_sent) self.keep_info['rows_examined'] = int(rows_examined) elif re.match(r"^use ", line, flags=re.IGNORECASE): match = re.match(r"^use +([^;]+);?$", line, flags=re.IGNORECASE) if not match: continue self.selected_database = match.group(1) elif re.match("SET time", line, flags=re.IGNORECASE): if self.keep_info['query_time'] is None: self.next_is_query = False self.keep_info.update(self.init_keep_info) print("Not found comment out information") else: match = re.match(r"^SET timestamp=([0-9]+);?$", line, flags=re.IGNORECASE) if not match: continue self.next_is_query = True self.keep_info['unix_time'] = int(match.group(1)) elif self.next_is_query is True: self.keep_info['query'] += line if not line: continue if line[-1] == ";" or line[-2] == ";": self.keep_info['unique_query'] = self.uniqueQueryPT(self.keep_info['query']) self.keep_info['query_hash'] = hashlib.md5(self.keep_info['unique_query'].encode(self.encode)).hexdigest()[0:8] self.keep_info['timestamp'] = datetime.fromtimestamp(self.keep_info['unix_time']).strftime('%Y-%m-%d %H:%M:%S') if self.keep_info['query_hash'] not in self.queries: self.queries[self.keep_info['query_hash']] = { 'count' : 0 } self.queries[self.keep_info['query_hash']]['count'] += 1 self.printInfo() self.printFull() self.keep_info.update(self.init_keep_info) else: self.keep_info['query'] += " " else: print("Found invalid line (%s)" % line) print("Keep info is") print(self.keep_info) return # スロークエリーの情報を JSON 形式で出力 def printInfo(self): if not (self.keep_info['unix_time'] and self.keep_info['query_time'] and self.keep_info['lock_time'] ) and self.keep_info['rows_examined'] is None: print("Error: insufficient parameter slowquery", file=sys.stderr) return query_info = { 'unix_time' : self.keep_info['unix_time'], 'timestamp' : self.keep_info['timestamp'], 'unique_query_hash' : self.keep_info['query_hash'], 'count_in_hour' : self.queries[self.keep_info['query_hash']]['count'], 'db_cluster' : self.cluster, 'db_instance' : self.host, 'db_role' : self.role, 'db_tier' : self.tier, 'src_ip' : self.keep_info['src_ip'], 'query_time' : self.keep_info['query_time'], 'lock_time' : self.keep_info['lock_time'], 'rows_sent' : self.keep_info['rows_sent'], 'rows_examined' : self.keep_info['rows_examined'], 'database' : self.selected_database, 'unique_query' : self.keep_info['unique_query'], 'unique_query_size' : len(self.keep_info['unique_query']), 'query' : self.keep_info['query'][0:self.max_query_size], # limit queruy size 'query_size' : len(self.keep_info['query']), } # ファイルサイズがでかくなったらファイル名の連番をインクリメント query_info_json = json.dumps(query_info) query_info_size = len(query_info_json) if (self.out_size + query_info_size) > self.max_file_size: self.out_seq += 1 self.out_size = 0 else: self.out_size += query_info_size with open(self.out_path+"/"+self.host+"."+self.out_ts+"."+str(self.out_seq)+".json", mode='a') as out_fd: print(query_info_json, file=out_fd) return # スロークエリー全文を出力 def printFull(self): with open(self.out_path+"/"+self.host+"."+str(self.keep_info['unix_time'])+"."+self.keep_info['query_hash']+"."+str(self.queries[self.keep_info['query_hash']]['count'])+".sql", mode='w') as out_fd: print(self.keep_info['query'], file=out_fd) return # スロークエリーを正規化 def uniqueQueryPT(self, query): # IN 句で ' BINARY "1001016868768",' とかいったのが延々ならんでいる長大なクエリーで、正規表現だと重すぎて処理できなかったので先にシュリンク if len(query) > 262144: query2 = re.sub(r',',r',\n',query) query = '' for line in query2.splitlines(): if not re.match(r' BINARY "\d+",', line): query += line query = re.sub(r'\n','',query) # percona toolkit の fingerprint 関数をベースに perl から python に書き換え # https://github.com/percona/percona-toolkit/blob/b6dff19d972d2a47aaac575d8a9d5572b482d0a3/bin/pt-fingerprint#L1645-L1713 # remove ; #query = query.rstrip(';') query = re.sub(r';$', '', query) # remove comments query = re.sub(r'/\*[^!].*?\*/', '', query) query = re.sub(r'(?:--|#)[^\'"\\r\\n]*(?=[\r\n]|\Z)', '', query) # normalize whitespace query = re.sub(r'([^\\])(\\\')', r'\1', query) query = re.sub(r'([^\\])(\\\")', r'\1', query) query = query.replace('\\\\', '') query = query.replace("\\'", '') query = query.replace('\\"', '') query = re.sub(r'([^\\])(".*?[^\\]?")', r'\1?', query) query = re.sub(r'([^\\])(\'.*?[^\\]?\')', r'\1?', query) query = re.sub(r'\bfalse\b|\btrue\b', '?', query, flags=re.I) query = re.sub(r'([._-])[a-f0-9]{32}', r'\1?', query) query = re.sub(r'[0-9+-][0-9a-f.xb+-]*', '?', query) query = re.sub(r'[xb+-]\?', '?', query) query = query.lstrip() query = query.rstrip() query = re.sub(r'\s+', ' ', query) query = query.lower() query = re.sub(r'\bnull\b', '?', query) query = re.sub(r'\b(in|values?)(?:[\s,]*\([\s?,]*\))+', r'\1(?+)', query) query = re.sub(r'\b(select\s.*?)(?:(\sunion(?:\sall)?)\s\1)+', r'\1 /*repeat\2*/', query) query = re.sub(r'\blimit \?(?:, ?\?| offset \?)?', 'limit ?', query) # compress in (binary ?, ... , binary ?) query = re.sub(r'\bin\s*\(\s*(binary\s+\?[\s?,]*)+\)', r'in (binary ?+)', query) #if re.search(r'\bORDER BY ', query, re.I): # query = re.sub(r'\G(.+?)\s+ASC', r'\1', query, flags=re.I) query = re.sub(r'\b(.+?)\s+ASC', r'\1', query, flags=re.IGNORECASE) return query # json 出力用にフォーマット変換 def json_serial(obj): # 日付型の場合には、文字列に変換します if isinstance(obj, (datetime, date)): return obj.isoformat() # 上記以外はサポート対象外. raise TypeError ("Type %s not serializable" % type(obj)) # json 形式でオブジェクトを出力 def jprint(obj): json.dump(obj, sys.stdout, default=json_serial) ## メイン args = sys.argv db_cluster = args[1] db_instance = args[2] db_role = args[3] db_tier = args[4] logfile = args[5] parser = parserSlowQuery(db_cluster, db_instance, db_role, db_tier) parser.convertLog(logfile)
新バージョンの reader を低ウェイトでサービスインさせて本番で検証
弊社のスロークエリーは1秒以上かかったクエリーを記録しているため、1秒未満のクエリーは記録されていません。 また、本番環境でまとまったジェネラルログを記録するのは負荷的に難しいところです。 なので、旧バージョンでは一秒未満の実行時間だけど新バージョンでは遅くなってしまうクエリーを拾いきれないことになります。
そこで Blue/Green の切替えを行う前に Route53 の加重ルーティングを使って新バージョンの reader を低ウェイトでサービスインさせ様子を見ました。 MySQL 8.0 系で無くなったクエリーキャッシュを前提としたクエリーなどが引っかかったため対応を行ってから切り替えを行いました。
あとがき
ざっくりですが、こんなかんじで Aurora MySQL 3 切り替え時の検証を行っていきました。 SELECT COUNT() が遅くなるなど先人の方々が情報を出していてくれてたおかげで、 いろいろ段取りを考えてなんとかアップグレードを行うことが出来ましたのでほんと感謝です。
あと、個人的には pt-query-digest でかゆいところに手が届かなかったところに手を入れたスロークエリーパーサがができて通常の運用でも使えそうなのが一番の成果かもしれません。