Yappli Tech Blog

株式会社ヤプリの開発メンバーによるブログです。最新の技術情報からチーム・働き方に関するテーマまで、日々の熱い想いを持って発信していきます。

Goの並行処理入門 - channel編

はじめに

こんにちは。サーバーサイドエンジニアの佐野(@Kiyo_Karl2)です。

本記事は、4本の連載記事の3本目となります。

対象読者

  • Go言語の基礎はわかっているが、Goroutineについてはよく理解していない
  • メモリ、プロセス、スレッド、並行処理、並列処理といったワードについて概要とその違いを理解している

連載記事を通して取り扱わないこと

本連載記事ではGoroutine、syncパッケージ、channel、select文の基礎にフォーカスを当てています。 以下のような発展的な内容については触れていませんのでご了承くださいm( )m

  • ゴルーチンリークを避ける手法
  • 並行処理のエラーハンドリング
  • チャネルやselect文の発展的な内容
  • ゴルーチンのスケジューリングについて
  • ファンアウト、ファンイン
  • contextパッケージによるゴルーチンのタイムアウトや中断方法

チャネル(channel)

チャネルは、ゴルーチン間の通信に利用されるものです。 Go WikiのMutexOrChannelに次のような一文があります。

One of Go’s mottos is _“Share memory by communicating, don’t communicate by sharing memory.”

Goのモットーのひとつとして、「通信によってメモリを共有し、メモリの共有によって通信してはいけない」というものあがります。

Goにはメモリ共有によって通信する手段(複数のゴルーチンがグローバル変数や共有メモリに直接アクセスして通信する手段)がGoの並行処理入門-syncパッケージ編で紹介したsync.Mutexにより提供されていますが、あくまでもOnce型とWaitGroup型以外は低水準ライブラリ内で利用することを想定されているため、データの所有権を移動しようとしていたり、複数のゴルーチンを協調させようとしている場合はsyncパッケージよりもチャネルの利用が推奨されます。

双方向チャネル

チャネルの宣言は以下のようにします。

var dataStream chan interface{}
dataStream = make(chan interface{})

chan interface{}とすることでinterface{}型のチャネルを宣言できます。 2行目で、make関数を利用してチャネルの初期化処理をしています。

このdataStreamはinterface{}型のため任意の値を書き込んだり読み込んだりできます。 これを双方向チャネルと言います。

単方向チャネル

送信専用と受信専用のチャネルをつくることができます。 受信専用チャネルは<-演算子をchanの左に追加するだけです。

var dataStream <-chan interface{}
dataStream = make(<-chan interface{})

送信専用チャネルは<-演算子を追加するだけです。

var dataStream chan<- interface{}
dataStream = make(chan<- interface{})

一方向チャネルを初期化するケースはあまりないですが、これは関数の引数や戻り値としてよく利用されます。 下記のように記述しても、Goが双方向チャネルを必要に応じて暗黙的に単方向チャネルへ変換してくれるので非常に便利です。

package main

func main() {
    var receiveChan <-chan interface{}
    var sendChan chan<- interface{}
    dataStream := make(chan interface{})

    // dataStreamは双方向チャネルだがreceiveChanとsendChanに
    // 代入したときに単方向チャネルに変換される
    receiveChan = dataStream
    sendChan = dataStream
}

チャネルを利用するときは、<-演算子を利用して以下のように書きます。

チャネルを送信するときはチャネルの右に<-演算子を書き、受信するときはチャネルの左に演算子を記述します。

package main

import "fmt"

func main() {
    stringStream := make(chan string)

    go func() {
        stringStream <- "Hello channel" //1
    }()

    fmt.Println(<-stringStream) //2
}

1で文字列の"Hello channel"をチャネルへ送り、2でチャネルから文字列を読み込んで受信します。

結果は以下のようになります。

Hello channel

上記のようにチャネルを通じてデータの送受信を行うことができます。

ここでひとつ疑問が生まれます。

Goの並行処理入門-Goroutine基礎編、ゴルーチンは将来の不確定なタイミングでスケジュールされるため、匿名のゴルーチンが実行される前にメインゴルーチンが終了してしまうといったことを思い出してください。

しかし、先程のコードではHello channelが出力されるので、このコード自体は正しいです。 では、なぜメインゴルーチンは匿名のゴルーチンの完了を待ってくれるのでしょうか?

答えは、「Goのチャネルはブロックをするから」です。 キャパシティがいっぱいのチャネルに書き込もうとするとするゴルーチンはチャネルに空きが出るまで待機し、空のチャネルから読み込もうとしているゴルーチンは少なくとも一つの要素を受信するまで待機します。 上記のコードだと、fmt.Println(<-stringStream)でチャネルから値を読み込もうとしていて、匿名ゴルーチンがstringStreamへ文字列リテラルを書き込もうとしているため、この書き込みが終わるまでメインゴルーチンはブロックされます。

逆に下記のように100%チャネルに書き込みが発生しないようなコードを書いてしまうと、メインゴルーチンがブロックされたままになるためデッドロックが発生します。

package main

import "fmt"

func main() {
    stringStream := make(chan string)

    go func() {
        if true {
            return
        }
        stringStream <- "Hello channel"
    }()

    fmt.Println(<-stringStream)
}

実行してみます。

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:
main.main()
        /Users/xxxxxx/develop/my_project/go_sandbox/cmd/sample_app/gorutine.go:15 +0x7c
exit status 2

メインゴルーチンはstringStreamの値が書き込まれるのを待機しつづけますし、一方匿名ゴルーチンは値を永遠に書き込まないため、デッドロックが発生してしまいます。(デッドロックの予防方法については後述)

次にチャネルの読み込みについて見ていきます。 <-演算子からの受信はオプションとして2つの値を返すことができます。

package main

import "fmt"

func main() {
    stringStream := make(chan string)
    go func() {
        stringStream <- "Hello channel"
    }()

    salutation, ok := <-stringStream
    fmt.Printf("(%v): %v", ok, salutation)
}

実行結果は以下のようになります。

(true): Hello channel

チャネルの送信者は値をこれ以上送信しないことを示すためにチャネルをcloseすることができます。チャネルの受信時に2つめのパラメータokでチャネルが閉じられたがかどうか確認することができます。

closeキーワードを利用すると、そのチャネルへ書き込みことはできなくなるため、closeしたらそれ以上そのチャネルには値は送られてこないということを示すことができます。 また、closeしたチャネルを読み込むことはできるので、その後の処理で読み込んだチャネルをループさせたり、通信を新しいチャネルで再開させたりすることができます。

func main() {
    intStream := make(chan int)
    go func() {
        defer close(intStream)
        for i := 1; i <= 5; i++ {
            intStream <- i
        }
    }()

    for int := range intStream {
        fmt.Printf("%v ", int)
    }
}

上記を実行すると、下記のような出力がされます。

1 2 3 4 5

closeによりチャネルを閉じることは、複数のゴルーチンに同時にシグナルを送る方法のひとつです。 複数のゴルーチンがあるとき、そのゴルーチンの数分チャネルへ書き込まなければメインゴルーチンのブロックを解除できないと思うかもしれませんが、closeでチャネルを閉じれば、そのチャネルへ書き込みが行われなくてもブロックを解除することができます。

また、チャネルを閉じる方がコストも低く性能も良いです。

以下は、複数のゴルーチンを開放する例です。

func main() {
    begin := make(chan int)
    var wg sync.WaitGroup

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done()
            <-begin
            fmt.Printf("Goroutine %d\n", i)
        }(i)
    }

    fmt.Println("Unblocking goroutin!")
    close(begin)
    wg.Wait()
}

実行すると以下のような結果となります。

Unblocking goroutin!
Goroutine 2
Goroutine 9
Goroutine 4
Goroutine 5
Goroutine 8
Goroutine 7
Goroutine 0
Goroutine 3
Goroutine 1
Goroutine 6

実はGoの並行処理入門-syncパッケージ編で紹介した、sync.Condを利用しても同様の処理を実現できますが、チャネルを利用しても複数のゴルーチンを開放する処理を書くことができます。

バッファ付きチャネル

バッファ付きチャネルは一言で言うとインメモリのFIFOキューと言えます。 例えば4つのバッファを持つチャネルは、4つまで値を受信し、5つめの値が送られてきたら、そのチャネルが別のゴルーチンによって読み込みが行われて空きができるまで、そのチャネルはブロックされます。 チャネルに空きができたらチャネルのバッファ末尾に書き込みが行われます。

次はバッファ付きチャネルの例です。

package main

import (
    "bytes"
    "fmt"
    "os"
)

func main() {
    var stdoutBuff bytes.Buffer
    defer stdoutBuff.WriteTo(os.Stdout)

    intStream := make(chan int, 4)
    go func() {//2
        defer close(intStream)
        defer fmt.Fprintln(&stdoutBuff, "Producer Done.")
        for i := 0; i < 5; i++ {
            fmt.Fprintf(&stdoutBuff, "Sending: %d\n", i)
            intStream <- i
        }
    }()

    for integer := range intStream {
        fmt.Fprintf(&stdoutBuff, "Received %v.\n", integer)
    }
}

実行結果

Sending: 0
Sending: 1
Sending: 2
Sending: 3
Sending: 4
Producer Done.
Received 0.
Received 1.
Received 2.
Received 3.
Received 4.

バッファ付きチャネルは、そのバッファが満杯になるまで送信側がブロックされずに続けてデータを送信できます。同様に、バッファにデータがある限り、受信側はブロックされずにデータを受信できます。これにより、送受信の操作が非同期で行われ、ゴルーチンが他の処理を続けることができるというメリットがあります。

上記の実行結果をみると、匿名ゴルーチンが先にintStreamへすべての書き込みを行ったあとに読み込み処理が走っていることがわかるかと思います。

チャネルの所有者が事前書き込む回数がわかるのであれば、バッファ付きチャネルを利用することで先に効率良く書き込みを行うことが可能になります。

ただし、バッファ付きチャネルを使用する際には、バッファサイズの設計に注意する必要があります。

バッファが大きすぎるとメモリを無駄に消費することになり、小さすぎると上記のメリットを十分に享受できない場合があります。また、バッファ付きチャネルを使用すると、送受信のタイミングに関する複雑さが増すことも考慮する必要があるので注意しましょう。

nilチャネル

これまで、双方向チャネル、単方向チャネル、バッファなしチャネル、バッファ付きチャネルについて記載しました。 最後にnilチャネルについて触れていきます。

nilはチャネルのデフォルト値です。 まず、以下のようにnilチャネルを読み込もうとするとデッドロックが起こります。

func main() {
var dataStream chan any
    <-dataStream
}

実行結果

fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive (nil chan)]: main.main() 

xxx/xxx/develop/my_project/go_sandbox/cmd/sample_app/gorutine.go:5 +0x24 exit status 2

nilチャネルへの書き込みも同様にデッドロックになります。

func main() {
var dataStream chan any
    dataStream <- 1
}

実行結果

fatal error: all goroutines are asleep - deadlock! 
goroutine 1 [chan send (nil chan)]: main.main() xxx/xxx/develop/my_project/go_sandbox/cmd/sample_app/gorutine.go:5 +0x3c exit status 2

nilチャネルをcloseしようとするとパニックになります。

func main() {
    var dataStream chan any
    close(dataStream)
}

実行結果

panic: close of nil channel
goroutine 1 [running]: main.main() xxx/xxx/develop/my_project/go_sandbox/cmd/sample_app/gorutine.go:5 +0x20 exit status 2

上記のようにnilチャネルを読み込んだり、書き込んだりしようとするとブロックが発生してデッドロックとなります。 nilチャネルはcloseしようとするとパニックを起こしていまいます。 チャネルを扱うときには必ず初期化しましょう。

チャネルを扱う上でのポイント

チャネルを扱う上で大事なことは、チャネルに書き込むゴルーチンとチャネルを読む込むゴルーチンとでしっかりと責務を分けて実装することです。 このようにすることでうっかりnilチャネルやすでに閉じてしまったチャネルだと知らずに値を書き込もうとしてパニックを起こしてしまう危険を減らすことができます。

以下、チャネルに書き込む権限をもつゴルーチンを所有者、チャネルを読み込む権限を持つゴルーチンを利用者と呼ぶことにします。

チャネルの所有者

チャネルを所有するゴルーチンは以下の4つの責務を負います。

  1. チャネルの初期化
  2. 書き込みを行う、もしくは他のゴルーチンへ所有権を渡す
  3. チャネルを閉じる
  4. 上記の3つをカプセル化する

これらを守って実装すると、以下の危険性が少なくなります。 - チャネルを初期化するようにしているので、nilチャネルへ書き込んでデッドロックする危険がなくなる - チャネルを初期化するようにしているので、nilチャネルを閉じることによって起こるパニックの危険もなくなる - チャネルをゴルーチン内で閉じるようにしているので、閉じたチャネルへ書き込むことによるパニックの危険がなくなる - チャネルに対する不適切な書き込みを防げる

このようにすることで、どのゴルーチンがチャネルを所有しているのかがコード上でわかるようになります

チャネルの利用者

チャネルの利用者は以下の2つについて責務を負います。

  • チャネルがいつ閉じられたか
    • これは読み込み時に2つめの戻り値(ok)を確認すれば良いです
  • ブロックする操作は慎重に扱う
    • これは実装に依存するため、定義が難しいです。読み込みはブロックが発生するということを常に意識することが大事です

サンプルコード

以下はサンプルコードで上記の話をコードに落とし込んだものです。 言葉だけだといまいちピンとこないかと思うので、是非理解の助けとしてもらえればと思います!

package main

import "fmt"

func main() {
    chanOwner := func() <-chan int {
        resultStream := make(chan int, 5) //1
        go func() {                       //2
            defer close(resultStream) //3
            for i := 0; i < 5; i++ {
                resultStream <- i
            }
        }()
        return resultStream //4
    }
    resultStream := chanOwner()
    for result := range resultStream { //5
        fmt.Printf("Received: %d\n", result)
    }

    fmt.Println("Done Receiving!")
}
  1. チャネルの初期化をします。6回チャネルに書き込むと事前にわかっているので、キャパシティ5のバッファ付きチャネルをつくり、ゴルーチンを効率的に処理するようにしています
  2. 匿名ゴルーチンを起動します。このゴルーチンはresultStreamチャネルを所有しており、チャネルへの書き込み権限を持ちます
  3. resultStreamチャネルを利用したあとにdeferで確実に閉じるようにしています。これはチャネル所有者の責務です
  4. 読み込み専用チャネルを返します。こうすることで、呼び出し元で不用意にチャネルへ書き込みされるのを防げます
  5. resultStreamをループしてチャネルを読み込みます

こうして見ると、resultStreamチャネルの所有者はどのゴルーチンで、どのようにカプセル化されているのかおわかりになったではないしょうか。 このように実装すればnilチャネルや、閉じたチャネルへ書き込んだりしてしまうことを防げます。

チャネルの所有者と利用者で責務をわけてコードを書けば、コードの可読性はぐんと上がります。 この原則を守っていてもデッドロックが発生したときは、おそらくチャネル所有者のスコープが広すぎたが、チャネルの所有者が不明瞭なことに原因がある可能性が高いと言えるでしょう。 なので、なるべくチャネル所有者のスコープは狭くした方がバグは防げるでしょう。

最後に

ここまで読んでいただき、ありがとうございます。 今回はGoroutineと共によく利用されるchannelの基本的な使い方についてまとめてみました。 次回はselect文について解説していきたいと思います。

参考