はじめに
こんにちは。 サーバサイドエンジニアの窪田です。
yappliではweb上でノーコードでモバイルアプリ開発ができるプロダクトを提供しています。 その中の主要機能の1つにpush通知があります。 CMSユーザーがyappliのWebアプリ上でエンドユーザーに対して、いつどんなメッセージを送るかということを設定することができます。
push通知機能では
- 大量の配信
- 複数のセグメント条件の分岐
- 複数の配信方法
を実現しています。 この配信基盤の全容は大規模かつ複雑な仕組みになっています。 その基盤のうち部分的にAWS SQSによるキューの仕組みを使っています。
上の機能を実現するためにpush通知のメッセージは複数のキューを経由します。
例えば Goで動作するアプリケーションサーバ上で
- batch処理1
- Queue1にメッセージ情報をエンキューする(図の1)
- batch処理2
- Queue1からデキューしデータ集計や加工を施し、Queue2にエンキューする(図の2, 3)
のような処理が行われることを考えます。
この時、batch処理1, batch処理2で動く関数はそれぞれで単体テストが書かれていて、SQSとの接続部分はモック関数によって記述できます。 これでほとんど問題はないですが、部分的にインテグレーションテストが書きたい場合があります。 上の例ではbatch処理1, batch処理2を繋げたテストを書くような場合です。
今回は、LocalStackを使ってこのSQSのキューを介した複数の処理のインテグレーションテストを書くことについて考えていきます。
ちなみに、LocalStackは先月(2022年7月)V1.0がGAになりましたね! localstack.cloud
目的
- LocalStackを用いてSQSを開発環境で扱えるようにする
- SQSと接続された機能の自動テストを書く
- 自動テストがCircleCIで実行できるようにする
が実現されている状態を目指します。
やってみる
LocalStackを用いてSQSを開発環境で扱えるようにする
LocalStackをdockerコンテナで動かす
まず、localでSQSを動かします。 構成としては、dockerコンテナの上にLocalStack環境を作り、 mac上で動作するGoの処理の中で操作します。
LocalStackの最小構成はdocker-compose.yml上で以下のように書けます。
version: "3.8" services: localstack: image: localstack/localstack:1.0.0 ports: - "4566:4566" environment: SQS_ENDPOINT_STRATEGY: path
ちなみにV1.0のLocalStackでは
environment: SERVICES: sqs
のようにserviceを指定する必要がなくなりました。 デフォルトでLocalStackで用意されている全てのserviceを使えるようになっています。 SERVICES variableはdeprecatedになっています。
準備が整ったので
docker-compose up -d
コマンドでコンテナを起動します。
SQSのキューを作成する
LocalStack環境ができたのでその上にキューを作成していきます。 今回はpreparedという名前のキューを作成します。
~% aws --version aws-cli/2.7.1 Python/3.9.11 Darwin/21.2.0 exe/x86_64 prompt/off ~% aws sqs create-queue --queue-name prepared --endpoint-url http://localhost:4566 --region ap-northeast-1
もちろんLocalStack CLI (awslocalコマンド)を使っても同じことができます。 今回は省略を減らすためawsコマンドで書きます。
~% aws sqs list-queues --endpoint-url http://localhost:4566
でキューが作成されたことが確認できます。
SQSと接続された機能の自動テストを書く
aws-sdk-goを使ってGoからSQSを操作する
以下のようにaws-sdk-goを使うことで、SQSにエンキュー/デキューができます。
package adaptor import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sqs" ) type Adapter struct { sqs *sqs.SQS } func NewAdapter() (*Adapter, error) { sess, err := session.NewSession(&aws.Config{ Region: aws.String("ap-northeast-1"), Endpoint: aws.String("http://localhost:4566"), }) if err != nil { return nil, err } return &Adapter{ sqs: sqs.New(sess), }, nil } func (s *Adapter) Enqueue(queueName string, message string) error { i := &sqs.SendMessageInput{ QueueUrl: aws.String(queueName), MessageBody: aws.String(message), } _, err := s.sqs.SendMessage(i) if err != nil { return err } return nil } func (s *Adapter) Dequeue(queueName string) ([]*sqs.Message, error) { i := &sqs.ReceiveMessageInput{ QueueUrl: aws.String(queueName), } o, err := s.sqs.ReceiveMessage(i) if err != nil { return nil, err } return o.Messages, nil }
sqs.SendMessage
は
~% aws sqs send-message --queue-url prepared --message-body 送りたいメッセージ --endpoint-url http://localhost:4566
に対応し、
sqs.ReceiveMessage
は
~% aws sqs receive-message --queue-url prepared --endpoint-url http://localhost:4566
に対応します。
テストしたい関数を用意する
以下のように各UseCaseからSQSにエンキュー/デキューする関数を用意します。
package main import ( adaptor "github.com/user/repo/src/sqs_adaptor" ) func main() { // 省略 } const queueName = "prepared" func SendMessageToSQS(s *adaptor.Adapter, message string) error { err := s.Enqueue(queueName, message) if err != nil { return err } return nil } func ReceiveMessageFromSQS(s *adaptor.Adapter) (string, error) { messages, err := s.Dequeue(queueName) if err != nil { return "", err } if len(messages) == 0 { return "", nil } message := *messages[0].Body return message, nil }
冒頭の例の場合、batch処理1でSendMessageToSQS
を使ってエンキューし、
batch処理2でReceiveMessageFromSQS
を使ってmessage情報を取得するようなイメージです。
テストコードを書く
SendMessageToSQS
関数, ReceiveMessageFromSQS
関数を繋げてテストコードで表現します。
package main_test import ( "github.com/user/repo/src/main" adaptor "github.com/user/repo/src/sqs_adaptor" "github.com/stretchr/testify/assert" "testing" ) func TestEnqueueAndDeQueue(t *testing.T) { t.Run("enqueueしたメッセージをdequeueできメッセージを受け取れる", func(t *testing.T) { s, err := adaptor.NewAdapter() assert.NoError(t, err) // sqsにenqueue err = main.SendMessageToSQS(s, "送りたいメッセージ ") assert.NoError(t, err) var message string // sqsからdequeue message, err = main.ReceiveMessageFromSQS(s) assert.NoError(t, err) assert.Equal(t, message, "送りたいメッセージ ") }) }
~% go test ./src/main/main_test.go -v
でテスト実行ができます。
このテスト自体はシンプルで、モックも使わず、ただ裏にあるコンテナ上で動いているLocalStackのキューに出し入れしているという仕組みになります。
自動テストがCircleCIで実行できるようにする
当然、CI上でテストを動かす時にはLocalStackが動いている必要があるので、その設定をします。 最小構成で以下のように書けます。
version: 2.1 jobs: run-test: docker: - image: cimg/go:1.18 auth: username: $DOCKERHUB_USERNAME password: $DOCKERHUB_PASSWORD - image: localstack/localstack:1.0.0 auth: username: $DOCKERHUB_USERNAME password: $DOCKERHUB_PASSWORD steps: - checkout - run: command: sudo apt-get update && sudo apt install python3-pip && sudo pip3 install awscli - run: command: aws sqs create-queue --queue-name prepared --region ap-northeast-1 --endpoint http://localhost:4566 - run: command: go test ./src/main/... -v workflows: integration-test: jobs: - run-test
実行すると以下のようにCircleCI上で動作することが確認できました。
おわりに
今回はLocalStack上のSQSをGoで操作しインテグレーションテストを実行するということをやってみました。 モックを使わない場合、必要なサーバ等は用意する必要がありますが、LocalStack V1.0を使うことでとてもシンプルな記述で実現することができました。
- LocalStackを使った開発
- AWS SQSを使ったシステムの設計・開発
- Goによるアプリケーション開発
- CircleCI上でLocalStackを動作させたCI運用
それぞれ単体ではよく触るかもしれないですが、これらを組み合わせて業務レベルで取り組んでいける機会はなかなか無いので個人的には楽しいです。
実は今回紹介したSQSを使った仕組みはyappliのpush通知配信基盤の一部で、他の部分では今回紹介しなかった技術も多く使っています。 Goの記述も今回は最も単純な例を書きましたが、実際はpush通知配信基盤に合わせた多くの設計面の工夫があります。
少しでも興味持たれた方は気軽にカジュアル面談へお越しください! open.talentio.com