Yappli Tech Blog

株式会社ヤプリの開発メンバーによるブログです。最新の技術情報からチーム・働き方に関するテーマまで、日々の熱い想いを持って発信していきます。

Goの並行処理入門 - syncパッケージ編

はじめに

こんにちは。サーバーサイドエンジニアの佐野(@Kiyo_Karl2)です。

本記事は、4本の連載記事の2本目となります。

対象読者

  • Go言語の基礎はわかっているが、Goroutineについてはよく理解していない
  • メモリ、プロセス、スレッド、並行処理、並列処理といったワードについて概要とその違いを理解している

連載記事を通して取り扱わないこと

本連載記事ではGoroutine、sync、channel、select文の基礎にフォーカスを当てています。 以下のような発展的な内容については触れていませんのでご了承くださいm( )m

  • ゴルーチンリークを避ける手法
  • 並行処理のエラーハンドリング
  • チャネルやselect文の発展的な内容
  • ゴルーチンのスケジューリングについて
  • ファンアウト、ファンイン
  • contextパッケージによるゴルーチンのタイムアウトや中断方法

syncパッケージ

syncパッケージには並行処理の同期や相互排他ロックなどの機能が備わっています。 ここでは、いくつかのsyncパッケージの使い方についてまとめてみました。

WaitGroup

WaitGroupは、一連のゴルーチンが完了するのを待つために利用されます。 次のコードはWaitGroupを利用してゴルーチンの完了を待つ例です。

func main() {
    var wg sync.WaitGroup //1
    wg.Add(1)
    go func() {
        defer wg.Done() //2
        fmt.Println("1st goroutine sleeping...")
        time.Sleep(1)
    }()

    wg.Add(1) // 1
    go func() {
        defer wg.Done() //2
        fmt.Println("2nd goroutine sleeping...")
        time.Sleep(2)
    }()

    wg.Wait() //3
    fmt.Println("All goroutine done.")
}
  1. Add(1)、で1つのゴルーチンが起動したことを示します。(WaitGroupの内部カウンタを+1する)
  2. deferキーワードを用いてDone()を呼び出すことで、ゴルーチンのクロージャが終了する直前にWaitGroupへこのゴルーチンが終了することを伝えます。(WaitGroupの内部カウンタを-1する)
  3. Wait()を呼び出し、すべてのゴルーチンが終了するまで(WaitGroupの内部カウンタが0になるまで)メインゴルーチンをブロックします

上記を実行すると以下のような結果となります。

2nd goroutine sleeping...
1st goroutine sleeping...
All goroutine done.

ここでのポイントはAdd()の呼び出しは必ず監視対象ゴルーチンの直前で行うようにするということです。 ゴルーチンのクロージャ内でAdd()を呼んでしまうと、ゴルーチンがスケジューリングされるタイミングについては不確定なので、ゴルーチンのクロージャー内のAdd()より先にWait()が呼ばれてしまう可能性があります。 そうすると、メインゴルーチンがブロックされずに(ゴルーチンの完了を待たずに)終了してしまう可能性があります。

ただし、例外としてforループの中でゴルーチンを呼び出すときは監視対象を以下のようにAdd()でまとめることもあります。

package main

import (
    "fmt"
    "sync"
)

func main() {
    greetWorker := func(greetGroup *sync.WaitGroup, workerID int) {
        defer greetGroup.Done()
        fmt.Printf("Greeting from worker %v\n", workerID)
    }

    const totalWorkers = 5
    var greetGroup sync.WaitGroup
    greetGroup.Add(totalWorkers)
    for i := 0; i < totalWorkers; i++ {
        go greetWorker(&greetGroup, i+1)
    }
    greetGroup.Wait()
}

MutexとRWMutex

Mutex

Mutex(ミューテックス)は「相互排他」を表す「mutual exclusion」の略で、クリティカルセクションを保護する方法のひとつです。 クリティカルセクションというのは、プログラムが共有リソースを扱っているときに排他的アクセスが必要な箇所のことを指します。(詳しくは IT用語辞典-クリティカルセクション)

Mutexを用いると、並行処理の実装時に安全に共有リソースへアクセスするための排他的処理を書くことができます。

ここで以下のサンプルコードを見てみましょう。

func main() {
    var count int
    var lock sync.Mutex

    increment := func() {
        lock.Lock() 
        defer lock.Unlock() 
        count++
        fmt.Printf("increment: %d\n", count)
    }

    decrement := func() {
        lock.Lock() 
        defer lock.Unlock() 
        count--
        fmt.Printf("decrement %d\n", count)
    }

    // Increment
    var arithmetic sync.WaitGroup
    for i := 0; i <= 5; i++ {
        arithmetic.Add(1)
        go func() {
            defer arithmetic.Done()
            increment()
        }()
    }

    // Decrement
    for i := 0; i <= 5; i++ {
        arithmetic.Add(1)
        go func() {
            defer arithmetic.Done()
            decrement()
        }()
    }

    arithmetic.Wait()

    fmt.Println("arithmetic complete")
}

上記では複数のゴルーチンでcount変数のメモリを共有していますが、Mutexを利用することで「インクリメント時にはインクリメントだけする」といった相互排他制御が簡単に実装できます。

上記を実行すると以下のようになります。

increment: 1
decrement 0
decrement -1
decrement -2
decrement -3
decrement -4
decrement -5
increment: -4
increment: -3
increment: -2
increment: -1
increment: 0
arithmetic complete

Mutexを利用する上で、大事なポイントが2つありますので覚えておくと良いでしょう。

  • ロック解除はdeferキーワードを用いて行うこと
    • これによってpanicになったとしても確実にUnlock()を呼び出すことができます。Lock()を呼び出してUnlock()の呼び出しに失敗してしまうと永遠にロックが解除されないため、デッドロックが発生してしまいます。
  • クリティカルセクションをできるだけ短かくすること

RWMutex

複数の並行処理で共有するメモリにおいて、すべてのメモリが書き込みと読み込みが必要とは限らないと思います。 ここで利用できるのがsync.RWMutexです。

RWMutexは、メモリ管理の機能を提供してくれています。 これは、共有メモリへの同時読み込みを許容しつつ、書き込みを排他的に制御するために使用されます。 sync.RWMutexは、多くの読み込み操作と比較的少ない書き込み操作がある場合に特に有用です。 例えば、キャッシュや設定データなど、頻繁に読み込まれるが稀にしか更新されないデータに対して有効です。

以下にサンプルコートを示します。

package main

import (
    "fmt"
    "sync"
    "time"
)

// 共有リソースを表す構造体
type Data struct {
    sync.RWMutex
    value int
}

// 値を安全に更新
func (d *Data) Write(value int) {
    d.Lock() // 書き込みのためのロック
    fmt.Println("Writing value:", value)
    d.value = value
    time.Sleep(2 * time.Second) // 書き込み操作をわかりやすくするための遅延
    fmt.Println("Write finished")
    d.Unlock()                   // ロックを解放
}

// 値を安全に読み出す
func (d *Data) Read() int {
    d.RLock() // 読み込みのためのロック
    defer d.RUnlock()
    return d.value
}

func main() {
    var wg sync.WaitGroup
    data := Data{}

    // 書き込みゴルーチン
    wg.Add(1)
    go func() {
        defer wg.Done()
        data.Write(100)
    }()

    // 複数の読み込みゴルーチン
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            fmt.Printf("Goroutine %d reads value: %d\n", i, data.Read())
        }(i)
    }

    wg.Wait()
}

実行結果は以下のようになります。 実際に実行してみると、書き込みで2秒Sleepしている間は読み込みをブロックする様子がわかるかと思います。

Goroutine 9 reads value: 0
Goroutine 7 reads value: 0
Goroutine 8 reads value: 0
Goroutine 5 reads value: 0
Goroutine 0 reads value: 0
Writing value: 100
Goroutine 3 reads value: 0
Goroutine 6 reads value: 0
Write finished
Goroutine 4 reads value: 100
Goroutine 1 reads value: 100
Goroutine 2 reads value: 100

Cond

sync.Condは、「条件変数」とも呼ばれます。これは、特定の条件が満たされるのを待つために使われます。例えば、「データが利用可能になるのを待つ」や「特定の状態の変更を待つ」といった状況で便利です。

func main() {
    var wg sync.WaitGroup
    var dataReady bool
    cond := sync.NewCond(&sync.Mutex{}) //1

    // データが準備されるのを待つゴルーチン
    waitForData := func(i int) {
        defer wg.Done()
        cond.L.Lock()
        for !dataReady {
            fmt.Printf("ゴルーチン%d: データを待っています\n", i)
            cond.Wait() //2
        }
        fmt.Printf("ゴルーチン%d: データが準備されました\n", i)
        cond.L.Unlock()
    }

    // 5つのゴルーチンを起動し、データが準備されるのを待つ
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go waitForData(i)
    }

    // データを準備する
    go func() {
        fmt.Println("データの準備")
        time.Sleep(3 * time.Second) // データ準備に3秒かかるとする
        cond.L.Lock()
        dataReady = true
        cond.Broadcast() // 3
        cond.L.Unlock()
    }()

    wg.Wait()
}

上記では以下のことが行われています。

  1. 条件変数を作成。
  2. Wait()で、条件が満たされる(dataReady がtrueになる)のを待ちます。条件が満たされると、Wait()はブロックを解除します。
  3. Broadcast()で、全てのゴルーチンに条件が満たされたことを通知します。このコードでは5つのゴルーチンを起動しているので、5つのゴルーチン全てに条件が満たされたことが通知されます。
    • Signal()を利用すると、 条件変数を待っているのが一番長いゴルーチンに条件が満たされたことを通知します。

このように、sync.CondWait()Broadcast()は複数のゴルーチン間で条件に基づく同期を実現するために使用されます。Wait()は条件が満たされるのを効率的に待ち、Signal()またはBroadcast()によって条件が満たされたことを待機中のゴルーチンに通知することができます。

Once

いきなりですが、以下のコードを実行するとなにが出力されると思いますか?

package main

import (
    "fmt"
    "sync"
)

func main() {
    var count int

    increment := func() {
        count++
    }

    var once sync.Once
    var wg sync.WaitGroup
    wg.Add(100)

    for i := 0; i < 100; i++ {
        go func() {
            defer wg.Done()
            once.Do(increment)
        }()
    }
    wg.Wait()
    fmt.Printf("Count is %d\n", count)
}

上記を実行すると、100が出力されるかと思いきや、1が出力されます。

Count is 1

このように、sync.Onceを用いると、Doメソッドへ渡された関数が異なるそれぞれのゴルーチンで呼ばれたとしても1度しか実行されないような実装ができます。

Pool

Poolはオブジェクトプールパターン1を並行処理で安全な形で実装したものです。 例えばオブジェクトの初期化に時間がかかる処理、あるいはリソースを多く消費する場合、sync.Pool を使ってオブジェクトをプールし、再利用することでパフォーマンスを向上させることができます。

func main() {
    myPool := &sync.Pool{
        New: func() interface{} {
            fmt.Println("Creating new instance")
            return struct{}{}
        },
    }

    myPool.Get() //1
    instance := myPool.Get() // 2
    myPool.Put(instance) //3
    myPool.Get() //4
}
  1. Get()でプールしているインスタンスを取得します。初オブジェクトはまだ初期化されていないので、New関数が呼ばれて「Creating new instance」というメッセージが表示され、新しいインスタンスが作成されます。しかし、取得したインスタンスが変数に割り当てられていないため、ガベージコレクタによって捨てられます。
  2. instance := myPool.Get() は再びプールからインスタンスを取得しようとします。最初のGet()呼び出し後、プールは依然として空なので、またNew関数が呼び出され、「Creating new instance」と表示され、新しいインスタンスが作成されます。そして、このインスタンスを変数instanceに保存します。
  3. instanceをプールに返却します。これにより、プールの利用可能なインスタンスを1増やします。
  4. プールからインスタンスを再び取得します。今回はプールに利用可能なインスタンスがあるため、New関数は呼び出されず、既存のインスタンスが返されます。したがって、この時点では新しいインスタンスを生成するためのメッセージは表示されません。

上記のようにプールしているオブジェクトがあればNew関数を呼ばずに再利用することができます。

Poolを扱うときは以下の点に気をつけましょう。

  • sync.Poolをインスタンス化するときは、スレッド安全なNewメンバー変数を用意する
    • sync.Pool 自体はスレッドセーフなので取得・返却時の排他処理は考慮する必要はありません。しかし、プールに保存されているオブジェクト自体がスレッドセーフであるとは限りません。複数のゴルーチンが同じオブジェクトを同時に使用する場合、そのオブジェクトがスレッドセーフであることをちゃんと確認しましょう。
  • 型を混ぜてプールしない
    • リソースのやり取りは全てany(interface{})型で行われます。Stringを扱うことが前提のPoolだったとしてもIntをPutできてしまいます。これは実行しないとエラーにならないので注意してください。なるべく1つのプールで1つの型を扱うようにした方が良いでしょう。
  • プールから取り出したオブジェクトの利用が終わったら必ずPutを呼びオブジェクトを返却する
    • オブジェクトを利用してPut()を呼び忘れたら次利用するときに再作成しないといけません。これでは普通にオブジェクトを生成するのと変わりません。deferでPut()を呼びましょう。
  • Get()でインスタンスを取得するとき、受け取るオブジェクトの状態に依存する処理を書かない
    • sync.Pool から取得されるオブジェクトはさまざまな箇所で利用されるためオブジェクトの状態が予測不可能である可能性が高いです。オブジェクトがどのような状態であっても対応できるように実装する必要があります。

最後に

ここまで読んでいただき、ありがとうございます。 今回はGoroutineと共によく利用されるsyncパッケージの基本的なことについてまとめてみました。 次回はchannelについて解説していきたいと思います。

参考