はじめに
こんにちは、サーバーサイドエンジニアの佐野きよです。
現代のWebアプリケーションにおいて、ユーザー体験を向上させるために非同期処理は不可欠な技術となっています。特に、大量のデータを扱うバッチ処理や時間のかかる外部API連携などは、バックグラウンドで非同期に実行するのが一般的です。
しかし、この非同期処理をECS (Elastic Container Service) 上で何も考えずに実行すると、思わぬ落とし穴にはまることがあります。 それは、デプロイやスケールイン(サーバー台数を減らすこと)によってタスクが意図せず中断されてしまうリスクです。
ECSでは、タスクの停止命令が出てから実際に終了するまでの待機時間(stopTimeout)がデフォルトで30秒、最大でも120秒しかありません。
そのため、これを超える長時間ジョブが実行されている最中にタスクの入れ替えが発生すると、処理が途中で打ち切られ、データの不整合といった深刻な問題を引き起こす可能性があります。
この記事では、Laravelのキュー(Job)を例に、AWSが提供する 「ECS Task Protection」 機能を利用して、この課題を解決する方法を具体的に解説します。
現状の非同期処理の構成
まず、本記事で前提とする非同期処理の構成は以下の通りです。 (この構成は、Laravelで非同期処理を実装する際によく見られる一般的なパターンかと思います。)
- フレームワーク: Laravel
- 非同期処理: Laravel Jobを利用
- キュードライバー:
databaseを指定 - ワーカープロセス:
php artisan queue:workコマンドをコンテナ起動時に実行し、キューを処理するワーカープロセスを常駐させている
ECS Task Protectionとは?
ここで登場するのが、今回の主役である ECS Task Protection です。
その名の通り、デプロイのタスク切り替え時などで発生するスケールイン操作によってECSタスクが停止されるのを一時的に「保護」する機能です。アプリケーション内からECS Agentが提供する特定のAPIエンドポイント (${ECS_AGENT_URI}/task-protection/v1/state) を呼び出すことで、タスク保護の有効/無効を動的に切り替えることができます。
- 有効化: タスクが保護され、スケールインの対象から除外されます。
- 無効化: 保護が解除され、スケールインの対象となります。
この仕組みを利用し、「Jobの実行直前にタスク保護を有効化し、Jobの完了後に無効化する」 という流れを実装することで、長時間かかる処理が完了した後にタスクを安全に終了させることが可能になります。
より詳細な情報については、以下のAWS公式ドキュメントを参照してください。
Laravelでの実装例
それでは、具体的なLaravelでの実装例を見ていきましょう。 今回は、タスク保護のロジックをまとめた基底クラス (BaseProtectedJob) と、実際にECS Agentと通信するサービスクラス (ECSTaskProtectionService) を作成します。
⚠️ 注意点
ここで紹介する全てのコードは、あくまで実装イメージを掴むためのサンプルです。実際の動作を保証するものではなく、エラーハンドリングなども一部省略していますので、ご了承ください。
基底クラス: BaseProtectedJob
まず、タスク保護を適用したいJobクラスに継承させるための基底クラスを作成します。このクラスの handle メソッドで、タスク保護の有効化・無効化のフローを制御します。
<?php abstract class BaseProtectedJob implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; /** * タスク保護期間を取得する (分) * Jobごとで独自の保護期間を設定したい場合は、このメソッドをオーバーライドする */ protected function getTaskProtectionExpiresInMinutes(): ?int { return null; // デフォルト値(例: 120分)を使用 } /** * Jobの実行処理 * Job実行前にECSタスク保護を有効化し、Job完了後に無効化する */ final public function handle(): void { $task_protection = new ECSTaskProtectionService(); // タスク保護を有効化 if (!$this->enableTaskProtection($task_protection)) { return; // 有効化に失敗したら処理を中断 } try { // 実際のJob処理を実行 $this->handleJob(); } finally { // Jobの成功・失敗に関わらず、必ずタスク保護を無効化する $this->disableTaskProtection($task_protection); } } /** * タスク保護の有効化処理 */ private function enableTaskProtection(ECSTaskProtectionService $task_protection): bool { try { $task_protection->enable($this->getTaskProtectionExpiresInMinutes()); return true; } catch (Exception $e) { // タスク保護を有効化できないのにJobをリトライさせても意味がないため、即座に失敗扱いにする Log::error('ECS Task Protection: Failed to enable protection - marking job as failed', ['error' => $e]); $this->fail($e); return false; } } /** * タスク保護の無効化処理 */ private function disableTaskProtection(ECSTaskProtectionService $task_protection): void { try { $task_protection->disable(); } catch (Exception $e) { // 無効化の失敗はログに残すが、Job自体の成否には影響させない // ここで例外を投げるとJobがリトライされ、タスク保護が再度有効化されてしまうため Log::error('ECS Task Protection: Failed to disable protection', ['error' => $e]); } } /** * 実際のJob処理を記述するメソッド * サブクラスでこのメソッドを実装する */ abstract protected function handleJob(): void; }
ポイント💡
handleメソッドをfinalにすることで、継承先クラスで処理フローが上書きされるのを防ぎます。try...finallyブロックを使い、Jobの処理が成功しても失敗しても、必ず最後にタスク保護が無効化されるようにしています。handleJobという抽象メソッドを用意し、個別のJobクラスではこのメソッド内に本来の処理を記述するように強制します。
継承先のクラスの実装例
BaseProtectedJob を作成したら、実際にタスク保護を適用したいJobクラスでこれを継承します。使い方は非常にシンプルです。
<?php ... class SampleLongRunningJob extends BaseProtectedJob { /** * このJob独自のタスク保護期間を60分に設定する */ protected function getTaskProtectionExpiresInMinutes(): ?int { return 60; } /** * 実際のJob処理 * このメソッド内に本来のロジックを記述する */ protected function handleJob(): void { Log::info('[SampleLongRunningJob] Start processing...'); // 時間のかかる処理をシミュレート sleep(180); // 3分間待機 Log::info('[SampleLongRunningJob] Finished processing!'); } }
ポイント💡
BaseProtectedJobをextendsします。- 実際の処理は
handleJobメソッドの中に記述するだけで、タスク保護に関するロジックを意識する必要はありません。 - 必要であれば
getTaskProtectionExpiresInMinutesメソッドをオーバーライドすることで、Jobごとに最適な保護期間を簡単に設定できます。
サービスクラス: ECSTaskProtectionService
次に、ECS AgentのAPIを呼び出すロジックをカプセル化したサービスクラスの実装例です。
<?php ... class ECSTaskProtectionService { private const MAX_RETRY_COUNT = 3; private const RETRY_DELAY_MS = 500; private const DEFAULT_EXPIRES_IN_MINUTES = 120; private const REQUEST_TIMEOUT_SECONDS = 10; private string $ecs_agent_uri; public function __construct() { $this->ecs_agent_uri = config('ecs.agent_uri', ''); } /** * タスク保護を有効化する */ public function enable(?int $expires_in_minutes = null): void { $expires_in_minutes = $expires_in_minutes ?? self::DEFAULT_EXPIRES_IN_MINUTES; $payload = [ 'ProtectionEnabled' => true, 'ExpiresInMinutes' => $expires_in_minutes, ]; $this->agentCall($payload); Log::info('ECS Task Protection: Enabled successfully', ['expires_in_minutes' => $expires_in_minutes]); } /** * タスク保護を無効化する */ public function disable(): void { $payload = ['ProtectionEnabled' => false]; $this->agentCall($payload); Log::info('ECS Task Protection: Disabled successfully'); } /** * ECS_AGENT_URI環境変数はコンテナ内からでしか読み込めない * よってローカル環境などの、ECS環境でない場合はAPIコールをスキップする */ private function isEcsEnvironment(): bool { return $this->ecs_agent_uri === ''; } /** * ECS Agent APIを呼び出す */ private function agentCall(array $payload): void { if (!$this->isEcsEnvironment()) { Log::info('ECS Task Protection: Skip agent call (not ECS environment)'); return; } $endpoint = $this->getEndpoint(); try { Http::withHeaders(['Content-Type' => 'application/json']) ->timeout(self::REQUEST_TIMEOUT_SECONDS) ->retry(self::MAX_RETRY_COUNT, self::RETRY_DELAY_MS, function ($exception) { if ($exception instanceof ConnectionException) { Log::warning('ECS Task Protection: Network error, retrying', ['error' => $exception]); return true; // ネットワーク系のエラーの場合のみリトライ } return false; }) ->put($endpoint, $payload) ->throw(); // HTTPステータスコードがエラーの場合に例外をスロー Log::info('Successful ECS Task Protection agent call'); } catch (Exception $e) { Log::error('ECS Task Protection: agent call failed', ['error' => $e]); throw $e; } } /** * APIエンドポイントのURLを構築する */ private function getEndpoint(): string { return rtrim($this->ecs_agent_uri, '/') . '/task-protection/v1/state'; } }
ポイント💡
- ECS AgentのURIは、コンテナ内で自動的に設定される環境変数
ECS_AGENT_URIから取得します。 - ローカル環境などECS以外の環境でコードが実行された場合を考慮し、
ECS_AGENT_URIが存在しない場合はAPIコールをスキップするようになっています。 - LaravelのHTTPクライアントが持つ
retry機能を利用して、一時的なネットワークエラーに対する障害耐性を持たせています。 - 障害時の調査などで役に立つかもしれないので、コンテナIDをログコンテキストして含めるようにしています。
- AWSで設定されているデフォルトのタスク保護期間(
ExpiresInMinutes)は120分ですが、なるべく無駄なタスク起動時間発生しないようにJobごとに保護期間は変更しています。
- AWSで設定されているデフォルトのタスク保護期間(
補足
SIGTERMを受け取った後にタスク保護のエンドポイントが叩かれたら結局Jobが中断されてしまうのでは?という疑問があるかもしれませんが、その心配は不要かと思います。
ECSコントロールプレーンからSIGTERMをタスクが受け取ったあとに UpdateTaskProtection API を実行すると以下のようなレスポンスが返ってきてエラーになりタスク保護を有効にできないので、SIGTERMを受け取った後にまたJobが拾われてしまうということは理論上無いはずです。
"requestParameters": {
"cluster": "test-cluster",
"tasks": [
"arn:aws:ecs:ap-northeast-1:123456789012:task/test-cluster/7395e17ec5704ba898aefafdd5418907"
],
"protectionEnabled": true,
"expiresInMinutes": 2
},
"responseElements": {
"protectedTasks": [],
"failures": [
{
"arn": "arn:aws:ecs:ap-northeast-1:123456789012:task/7395e17ec5704ba898aefafdd5418907",
"reason": "TASK_STOPPING_OR_STOPPED"
}
]
}
アプリケーション以外で考慮すべきこと
ここまでの実装でアプリケーション側の準備は整いましたが、実際にこの仕組みを機能させるためには、インフラ(AWS)側でもいくつかの設定が必要です。 今回はアプリケーションの実装が主テーマのため詳細は割愛しますが、これらのインフラ設定も忘れずに行いましょう。
IAM権限の付与
タスク保護APIを利用できるようにするため、タスクロールに権限を付与する必要があります。
デプロイ戦略
Blue/Greenデプロイメントのような戦略を採用し、デプロイ時のタスク切り替えを安全に行う仕組みを利用することを推奨します。 具体的には、新しいタスク(Green)へのトラフィック切り替え後も、古いタスク(Blue)で実行中のJobがあれば、それが完了するまで待ってから古いタスクを終了させる、といった制御が必要になります。(参考)
まとめ
ここまで読んでいただきありがとうございます!!
今回は、Laravelで実装された長時間実行JobをECS上で安全に実行するため、ECS Task Protection を活用する方法を紹介しました。
Jobの実行前にタスク保護を有効化し、完了後に無効化するというシンプルな仕組みで、デプロイやスケールインによる処理の中断リスクを大幅に低減できます。これにより、データの整合性を保ち、より安定したシステム運用が可能になります。
ECS上で非同期処理を運用しようと検討している方は、ぜひこのTask Protectionを導入することを検討してみてください。
さいごに
ヤプリではサーバーサイドエンジニアを随時募集しています! 興味を持った方、是非一度カジュアル面談を受けてみませんか…??