はじめに
こんにちは。サーバーサイドエンジニアの佐野(@Kiyo_Karl2)です。
本記事は、4本の連載記事の2本目となります。
- Goの並行処理入門-Goroutine基礎編
- Goの並行処理入門-syncパッケージ編 ←今ここ
- Goの並行処理入門-channel編
- Goの並行処理入門-select編
対象読者
- Go言語の基礎はわかっているが、Goroutineについてはよく理解していない
- メモリ、プロセス、スレッド、並行処理、並列処理といったワードについて概要とその違いを理解している
連載記事を通して取り扱わないこと
本連載記事ではGoroutine、sync、channel、select文の基礎にフォーカスを当てています。 以下のような発展的な内容については触れていませんのでご了承くださいm( )m
- ゴルーチンリークを避ける手法
- 並行処理のエラーハンドリング
- チャネルやselect文の発展的な内容
- ゴルーチンのスケジューリングについて
- ファンアウト、ファンイン
- contextパッケージによるゴルーチンのタイムアウトや中断方法
syncパッケージ
syncパッケージには並行処理の同期や相互排他ロックなどの機能が備わっています。 ここでは、いくつかのsyncパッケージの使い方についてまとめてみました。
WaitGroup
WaitGroup
は、一連のゴルーチンが完了するのを待つために利用されます。
次のコードはWaitGroup
を利用してゴルーチンの完了を待つ例です。
func main() { var wg sync.WaitGroup //1 wg.Add(1) go func() { defer wg.Done() //2 fmt.Println("1st goroutine sleeping...") time.Sleep(1) }() wg.Add(1) // 1 go func() { defer wg.Done() //2 fmt.Println("2nd goroutine sleeping...") time.Sleep(2) }() wg.Wait() //3 fmt.Println("All goroutine done.") }
Add(1)
、で1つのゴルーチンが起動したことを示します。(WaitGroup
の内部カウンタを+1する)- deferキーワードを用いて
Done()
を呼び出すことで、ゴルーチンのクロージャが終了する直前にWaitGroup
へこのゴルーチンが終了することを伝えます。(WaitGroup
の内部カウンタを-1する) Wait()
を呼び出し、すべてのゴルーチンが終了するまで(WaitGroup
の内部カウンタが0になるまで)メインゴルーチンをブロックします
上記を実行すると以下のような結果となります。
2nd goroutine sleeping... 1st goroutine sleeping... All goroutine done.
ここでのポイントはAdd()の呼び出しは必ず監視対象ゴルーチンの直前で行うようにするということです。
ゴルーチンのクロージャ内でAdd()
を呼んでしまうと、ゴルーチンがスケジューリングされるタイミングについては不確定なので、ゴルーチンのクロージャー内のAdd()
より先にWait()
が呼ばれてしまう可能性があります。
そうすると、メインゴルーチンがブロックされずに(ゴルーチンの完了を待たずに)終了してしまう可能性があります。
ただし、例外としてforループの中でゴルーチンを呼び出すときは監視対象を以下のようにAdd()
でまとめることもあります。
package main import ( "fmt" "sync" ) func main() { greetWorker := func(greetGroup *sync.WaitGroup, workerID int) { defer greetGroup.Done() fmt.Printf("Greeting from worker %v\n", workerID) } const totalWorkers = 5 var greetGroup sync.WaitGroup greetGroup.Add(totalWorkers) for i := 0; i < totalWorkers; i++ { go greetWorker(&greetGroup, i+1) } greetGroup.Wait() }
MutexとRWMutex
Mutex
Mutex
(ミューテックス)は「相互排他」を表す「mutual exclusion」の略で、クリティカルセクションを保護する方法のひとつです。
クリティカルセクションというのは、プログラムが共有リソースを扱っているときに排他的アクセスが必要な箇所のことを指します。(詳しくは IT用語辞典-クリティカルセクション)
Mutex
を用いると、並行処理の実装時に安全に共有リソースへアクセスするための排他的処理を書くことができます。
ここで以下のサンプルコードを見てみましょう。
func main() { var count int var lock sync.Mutex increment := func() { lock.Lock() defer lock.Unlock() count++ fmt.Printf("increment: %d\n", count) } decrement := func() { lock.Lock() defer lock.Unlock() count-- fmt.Printf("decrement %d\n", count) } // Increment var arithmetic sync.WaitGroup for i := 0; i <= 5; i++ { arithmetic.Add(1) go func() { defer arithmetic.Done() increment() }() } // Decrement for i := 0; i <= 5; i++ { arithmetic.Add(1) go func() { defer arithmetic.Done() decrement() }() } arithmetic.Wait() fmt.Println("arithmetic complete") }
上記では複数のゴルーチンでcount変数のメモリを共有していますが、Mutex
を利用することで「インクリメント時にはインクリメントだけする」といった相互排他制御が簡単に実装できます。
上記を実行すると以下のようになります。
increment: 1 decrement 0 decrement -1 decrement -2 decrement -3 decrement -4 decrement -5 increment: -4 increment: -3 increment: -2 increment: -1 increment: 0 arithmetic complete
Mutexを利用する上で、大事なポイントが2つありますので覚えておくと良いでしょう。
- ロック解除はdeferキーワードを用いて行うこと
- これによってpanicになったとしても確実に
Unlock()
を呼び出すことができます。Lock()
を呼び出してUnlock()
の呼び出しに失敗してしまうと永遠にロックが解除されないため、デッドロックが発生してしまいます。
- これによってpanicになったとしても確実に
- クリティカルセクションをできるだけ短かくすること
RWMutex
複数の並行処理で共有するメモリにおいて、すべてのメモリが書き込みと読み込みが必要とは限らないと思います。
ここで利用できるのがsync.RWMutex
です。
RWMutex
は、メモリ管理の機能を提供してくれています。
これは、共有メモリへの同時読み込みを許容しつつ、書き込みを排他的に制御するために使用されます。
sync.RWMutex
は、多くの読み込み操作と比較的少ない書き込み操作がある場合に特に有用です。
例えば、キャッシュや設定データなど、頻繁に読み込まれるが稀にしか更新されないデータに対して有効です。
以下にサンプルコートを示します。
package main import ( "fmt" "sync" "time" ) // 共有リソースを表す構造体 type Data struct { sync.RWMutex value int } // 値を安全に更新 func (d *Data) Write(value int) { d.Lock() // 書き込みのためのロック fmt.Println("Writing value:", value) d.value = value time.Sleep(2 * time.Second) // 書き込み操作をわかりやすくするための遅延 fmt.Println("Write finished") d.Unlock() // ロックを解放 } // 値を安全に読み出す func (d *Data) Read() int { d.RLock() // 読み込みのためのロック defer d.RUnlock() return d.value } func main() { var wg sync.WaitGroup data := Data{} // 書き込みゴルーチン wg.Add(1) go func() { defer wg.Done() data.Write(100) }() // 複数の読み込みゴルーチン for i := 0; i < 10; i++ { wg.Add(1) go func(i int) { defer wg.Done() fmt.Printf("Goroutine %d reads value: %d\n", i, data.Read()) }(i) } wg.Wait() }
実行結果は以下のようになります。 実際に実行してみると、書き込みで2秒Sleepしている間は読み込みをブロックする様子がわかるかと思います。
Goroutine 9 reads value: 0 Goroutine 7 reads value: 0 Goroutine 8 reads value: 0 Goroutine 5 reads value: 0 Goroutine 0 reads value: 0 Writing value: 100 Goroutine 3 reads value: 0 Goroutine 6 reads value: 0 Write finished Goroutine 4 reads value: 100 Goroutine 1 reads value: 100 Goroutine 2 reads value: 100
Cond
sync.Cond
は、「条件変数」とも呼ばれます。これは、特定の条件が満たされるのを待つために使われます。例えば、「データが利用可能になるのを待つ」や「特定の状態の変更を待つ」といった状況で便利です。
func main() { var wg sync.WaitGroup var dataReady bool cond := sync.NewCond(&sync.Mutex{}) //1 // データが準備されるのを待つゴルーチン waitForData := func(i int) { defer wg.Done() cond.L.Lock() for !dataReady { fmt.Printf("ゴルーチン%d: データを待っています\n", i) cond.Wait() //2 } fmt.Printf("ゴルーチン%d: データが準備されました\n", i) cond.L.Unlock() } // 5つのゴルーチンを起動し、データが準備されるのを待つ for i := 0; i < 5; i++ { wg.Add(1) go waitForData(i) } // データを準備する go func() { fmt.Println("データの準備") time.Sleep(3 * time.Second) // データ準備に3秒かかるとする cond.L.Lock() dataReady = true cond.Broadcast() // 3 cond.L.Unlock() }() wg.Wait() }
上記では以下のことが行われています。
- 条件変数を作成。
Wait()
で、条件が満たされる(dataReady
がtrueになる)のを待ちます。条件が満たされると、Wait()
はブロックを解除します。Broadcast()
で、全てのゴルーチンに条件が満たされたことを通知します。このコードでは5つのゴルーチンを起動しているので、5つのゴルーチン全てに条件が満たされたことが通知されます。Signal()
を利用すると、 条件変数を待っているのが一番長いゴルーチンに条件が満たされたことを通知します。
このように、sync.Cond
のWait()
とBroadcast()
は複数のゴルーチン間で条件に基づく同期を実現するために使用されます。Wait()
は条件が満たされるのを効率的に待ち、Signal()
またはBroadcast()
によって条件が満たされたことを待機中のゴルーチンに通知することができます。
Once
いきなりですが、以下のコードを実行するとなにが出力されると思いますか?
package main import ( "fmt" "sync" ) func main() { var count int increment := func() { count++ } var once sync.Once var wg sync.WaitGroup wg.Add(100) for i := 0; i < 100; i++ { go func() { defer wg.Done() once.Do(increment) }() } wg.Wait() fmt.Printf("Count is %d\n", count) }
上記を実行すると、100が出力されるかと思いきや、1が出力されます。
Count is 1
このように、sync.Once
を用いると、Doメソッドへ渡された関数が異なるそれぞれのゴルーチンで呼ばれたとしても1度しか実行されないような実装ができます。
Pool
Poolはオブジェクトプールパターン1を並行処理で安全な形で実装したものです。
例えばオブジェクトの初期化に時間がかかる処理、あるいはリソースを多く消費する場合、sync.Pool
を使ってオブジェクトをプールし、再利用することでパフォーマンスを向上させることができます。
func main() { myPool := &sync.Pool{ New: func() interface{} { fmt.Println("Creating new instance") return struct{}{} }, } myPool.Get() //1 instance := myPool.Get() // 2 myPool.Put(instance) //3 myPool.Get() //4 }
Get()
でプールしているインスタンスを取得します。初オブジェクトはまだ初期化されていないので、New関数が呼ばれて「Creating new instance」というメッセージが表示され、新しいインスタンスが作成されます。しかし、取得したインスタンスが変数に割り当てられていないため、ガベージコレクタによって捨てられます。instance := myPool.Get()
は再びプールからインスタンスを取得しようとします。最初のGet()
呼び出し後、プールは依然として空なので、またNew関数が呼び出され、「Creating new instance」と表示され、新しいインスタンスが作成されます。そして、このインスタンスを変数instance
に保存します。instance
をプールに返却します。これにより、プールの利用可能なインスタンスを1増やします。- プールからインスタンスを再び取得します。今回はプールに利用可能なインスタンスがあるため、New関数は呼び出されず、既存のインスタンスが返されます。したがって、この時点では新しいインスタンスを生成するためのメッセージは表示されません。
上記のようにプールしているオブジェクトがあればNew関数を呼ばずに再利用することができます。
Poolを扱うときは以下の点に気をつけましょう。
sync.Pool
をインスタンス化するときは、スレッド安全なNewメンバー変数を用意するsync.Pool
自体はスレッドセーフなので取得・返却時の排他処理は考慮する必要はありません。しかし、プールに保存されているオブジェクト自体がスレッドセーフであるとは限りません。複数のゴルーチンが同じオブジェクトを同時に使用する場合、そのオブジェクトがスレッドセーフであることをちゃんと確認しましょう。
- 型を混ぜてプールしない
- リソースのやり取りは全て
any(interface{})
型で行われます。Stringを扱うことが前提のPoolだったとしてもIntをPutできてしまいます。これは実行しないとエラーにならないので注意してください。なるべく1つのプールで1つの型を扱うようにした方が良いでしょう。
- リソースのやり取りは全て
- プールから取り出したオブジェクトの利用が終わったら必ずPutを呼びオブジェクトを返却する
- オブジェクトを利用して
Put()
を呼び忘れたら次利用するときに再作成しないといけません。これでは普通にオブジェクトを生成するのと変わりません。deferでPut()
を呼びましょう。
- オブジェクトを利用して
Get()
でインスタンスを取得するとき、受け取るオブジェクトの状態に依存する処理を書かないsync.Pool
から取得されるオブジェクトはさまざまな箇所で利用されるためオブジェクトの状態が予測不可能である可能性が高いです。オブジェクトがどのような状態であっても対応できるように実装する必要があります。
最後に
ここまで読んでいただき、ありがとうございます。 今回はGoroutineと共によく利用されるsyncパッケージの基本的なことについてまとめてみました。 次回はchannelについて解説していきたいと思います。