サーバーサイドエンジニアの田実です!
タイトルの通り、完了まで1週間かかっていた日次バッチ処理を6分に短縮して1500倍以上高速化したので、今回はその改善について紹介したいと思います。
バッチ処理の概要
このバッチ処理ではサブシステムで取得している行動ログを集計して、初回アクセス日・最終アクセス日・最後にアクセスしたときのアプリバージョンを顧客レコードのフィールドに格納しています。 行動ログは Amazon Redshift 、顧客レコードは Amazon Aurora に入っています。バッチはPHP(Laravel)で実装されています。
雑に書くとこんな感じです。
変更前
変更前のコードを擬似コードで表現すると以下のようになっていました。
$query = getEventsSummaryQuery();
$query->chunk(2000, function($summary) {
$sql = createUpdateSql($summary);
updateMembers($sql)
});
Redshiftで集計するためのSQLを生成し、それを使って集計結果を2000件ずつ取得します。 その集計結果を使ってRDBのデータを更新する、という処理です。
この2000件ずつの集計・取得処理は1回あたり1分以上かかっており、顧客数 / 2000
回分のRedshiftへのアクセスが発生していました。
例えば顧客数が400万件、1回の集計処理に1分かかるとすると、 4,000,000 / 2,000 = 2000分 = 33時間
かかることになります。
この集計時間に更新時間も含めたのが全体の処理時間となるので実際にはもっと時間がかかっていました。
さらに、この日次処理は毎日実行されるため、前日の処理が終わらないまま当日の処理が開始・オーバーラップしていました。 このバッチ以外にもRedshiftにクエリが同時実行されることもあり、結果としてクエリの待ち時間が増え、最終的に集計・更新に1週間かかるバッチとなっていました。 *1 *2
変更後
Redshiftでは、OFFSET・LIMITを使ったチャンク処理は向いていないため、一気に集計してその結果をS3などに保存し、S3のファイルを読み出してデータを取得して更新するというアプローチに変更しました。集計結果をS3に格納するのは UNLOAD が利用できます。これによりRedshiftへのアクセスは初回の1回だけになり大幅に処理時間が改善しました。
擬似コードで表現すると以下のようになります。
// RedshiftのUNLOADで集計結果をS3に格納する
$query = unloadToS3($sql);
// 集計結果のファイル一覧を取得する
$files = fetchEventsSumaryFiles('s3://xxx');
foreach ($files as $file) {
// 集計結果のファイルを2000件ずつ読み出して更新
readFile($file)
->chunk(2000, function($summary) {
$sql = createUpdateSql($summary);
updateMembers($sql)
});
}
さらに、今回の集計に関しては最新の行動ログの情報だけ取得できれば良いので、全データを毎回集計・取得するのではなく、前日分・当日分の差分だけ取得するようにしました。 この行動ログの絞り込みにより、S3に格納する処理結果とそれによる更新件数を減らすことができ、大幅に処理時間を短縮しています。
具体的な実装
このバッチ処理はPHPで実装されているので、PHPを使ってどのように改善していったかを紹介します。
PHPのAWS SDKには streamWrapper を使ってファイル操作のように直感的にS3を扱えるようにするインターフェースがあります。
これを使うとprefixで絞り込んだS3のファイル一覧をforeachで処理したり、 fgets()
を使ってストリーミングでデータ取得することができます。
streamWrapperを利用するには S3Client
の registerStreamWrapper()
メソッドを呼び出します。
<?php $client = new S3Client(['region' => 'ap-northeast-1', 'version' => 'latest']); $client->registerStreamWrapper();
特定のパスのファイル一覧を取得する場合は recursive_dir_iterator()
が利用できます。
<?php \Aws\recursive_dir_iterator($path) foreach ($iter as $filepath) { // 処理 }
fopen()
で s3://
スキームのファイルを指定するとS3上のファイルを開くことができます。これと LazyCollection を使って2000件ずつデータを取得して更新処理を行っています。
<?php LazyCollection::make(function () { $f = fopen('s3://{bucket}/xxx', 'r'); while ($line = fgets($f)) { $attribute = json_decode($line); yield $attribute; } }) ->chunk(2000) ->each(function ($attributes) { // 更新処理 });
UNLOADはLaravelのDBファサードを使って呼び出しており、JSON形式で書き出すことでファイルの読み出しをしやすくしています。
<?php DB::connection('redshift')->statement(<<<SQL UNLOAD ('{SQL}') TO '{PATH}' IAM_ROLE '{IAM_ROLE}' FORMAT JSON SQL);
テストコード
streamWrapperを利用している箇所をメソッドとして切り出してテスト時にモックできるようにしています。
<?php class IntegrationBatch { public function openS3File(string $path) { return fopen($path, 'r'); } public function getUpdateMemberFiles(string $path) { return \Aws\recursive_dir_iterator($path) } }
ファイルの読み出しのモックは tmpfile()
を使ってローカルの実ファイルを fopen()
したものをmockの戻り値として利用し、 recursive_dir_iterator()
は配列を戻り値にしてます。
<?php $tmpfile = tmpfile(); $body = "test body"; fputs($tmpfile, $body); // モック用のファイルresourceを用意 $filename = stream_get_meta_data($tmpfile)['uri']; $r = fopen($filename, 'r'); $mock = Mockery::mock(IntegrationBatch::class) ->makePartial(); $mock->shouldReceive('openS3File') ->andReturn($r) ->once(); $mock->shouldReceive('getUpdateMemberFiles') ->andReturn(['/path/to/file']) ->once();
fopen(), fgets() の実体はHTTPのAPIを叩いているため PHP-VCR でモックできないかと思いましたが、streamWrapperとのかみ合わせが悪く、モックする形に落ち着きました。 RedshiftのUNLOADに関してもRedshiftコンパチなローカル環境を用意することが困難なため、UNLOAD呼び出し部分を関数化・モックしています。
まとめ
バッチ処理を爆速にした話を紹介しました。
バッチ処理が遅い事象は1週間かかる前にもっと早く気づける問題でしたが、バッチの実行時間を監視していなかったため検知が遅れてしまいました。 改善方法自体も重要ですが、その事象を素早く検知し対処できるような仕組みづくりも大切です。 弊社が利用しているモニタリングツールの NewRelic APM ではバッチ処理の監視もできそうなので、これらのツールも活用してより一層オブザーバビリティを高めていき、サービスの品質を高めていければと思います!