Advanced Go Concurrency 3 つのパターン
intro
ちょっと時間が経ってしまいましたが、 Go研 vol.03 では、 Google I/O 2013 で行われてた Go のセッションの 1 つである下記をテーマに研究しました。
Advanced Go Concurrency Patterns
資料は以下です。
https://github.com/goken/goken/blob/master/goken03/goken03.md
また、ここから順に実装しながら解説をしますが、その完成品はこちらにあります。
(commit 履歴も、本記事にある程度沿っています。)
https://gist.github.com/Jxck/5831269
スライドにそってやったのですが、セッションの内容は結構重ためだったので、 2 時間の Go 研だとちょっと消化不良ぎみだったのが反省点です。
そこで、このセッションの要である、並行処理に関する 3 つのパターンについて紹介します。
メインテーマ
このセッションにおいて一番重要だったのは、 Select の使い方です。
セッションでは、 RSS Reader をテーマとしていますが、その内容をデフォルメして、以下のような古典的 pooling 処理を考えてみます。
1. loop を回して、定期的に外部サーバに GET を投げる
2. 取得した結果を、 channel を経由して呼び出し側に渡す
3. 呼び出し側から Close を呼ばれたら終了処理をする
Loop の部分は、ナイーブな実装は下記のようになるでしょう。
// 定期的な取得をするループ func (p *Pooling) Loop() { for { // Close() が呼ばれいるかを調べる if p.closed { close(p.result) return } // Get を呼ぶだけ body, err := Get(p.url) if err != nil { // エラーがあったら、 3 秒間停止して再開 p.err = err time.Sleep(3 * time.Second) fmt.Println("failed") continue } // 結果を channel で渡す p.result <- body time.Sleep(1 * time.Second) } }
しかし、この実装にはいくつかの問題があります。
問題点
この loop の実装では以下の問題があります。
1. p.closed, p.err が race condition (複数の Goroutine が lock なしでアクセス)
2. Close() を実行しても Sleep() が長いとすぐには loop が終わらない
3. Close() 後に result の読み出しを止められると、 loop が永遠に終わらない
4. Get() が同期処理でブロックする
これらを解決するためには、基本的な方針として Select を用いたイベント駆動を導入します。 for-loop 内で select を使用する基本的な構造は以下になります。
func (p *Pooling) Loop() { // (1) mutable な変数の定義 for { // (2) case のための channel を定義 select { case <-c1: // イベントの補足 // read/write case c2 <- x: // 書き込み // read/write case y := <-c3: // 値の受け取り // read/write } } }
この構造は、 Go の並行処理プログラミングでは基本になります。
とくに (1), (2) と書いた領域の使い分けが大事だと学びました。
request/response パターン
Close() を channel を用いた形で実装しなおします。
トリガーにするための適当な channel を作ります。
ここでは closing という bool の channelにしています。
type Pooling struct { url string result chan string closing chan bool err error }
Close() はそこに何か送ればいいです。
といっても実際に何か送りたいデータがある訳ではないので close() してしまいます。
(close() すると ゼロ値の false が送られる)
// loop を終了させる func (p *Pooling) Close() error { close(p.closing) return p.err }
loop 側は case で補足したら終了処理をします。
func (p *Pooling) Loop() { for { select { case <-p.closing: close(p.result) return
これで p.closed へのアクセスは無くなりました。
しかし、これだと error がとれません。これは loop 側からメッセージで返して欲しいですね。
channel の channel
そこで、トリガとして作った closing を、 channel の channel にします。
type Pooling struct { url string result chan string closing chan chan error }
"error を送る channel" を送ることができる channel です。
Close() は、error を受けとるための channel を作って closing に送ります。
そして、 closing 経由でエラーが返ってくるのを待ちます。読み出してブロックするので、実際に終了処理が終わるまでは Close() は終わりません。また読み出した値をそのまま返せばシグネチャはそのままです。
// loop を終了させる func (p *Pooling) Close() error { done := make(chan error) p.closing <- done return <-done }
loop の方では、 Get() でエラーが発生したらそれを保持しておき、 Close() の(p.closing から値を受け取った)タイミングでそのエラーを送信します。
p.closing から受け取っている done は error 型の channel ななので、ここに送信するだけです。
(実際これだと最後の Get() で発生したエラーしか送れない気がするけど、今回の元ネタの方もそうなっていて、説明の都合上この方が都合がいいので細かい事は気にしない方向で)
// 定期的な取得をするループ func (p *Pooling) Loop() { // Get が失敗したらこれに入れる var err error for { select { case done := <-p.closing: close(p.result) done <- err return default: // Get を呼ぶだけ body, err := Get(p.url)
err は Loop() のローカルとして宣言しており、 Goroutine をまたいでいません。
別の Goroutine への送信は channel 経由のメッセージングを使っているので race condition はありません。
(ちなみに、 Get の結果の代入で body は新しい変数ですが、 err は再宣言になっているように思いました。実際これはエラーになりません。詳細はこちら。 http://qiita.com/Jxck_/items/2616abafea89ee97c477)
こうして、 p.closed, p.err で起こっていた race condition は解消されました。
interval channel
Close を channel 経由で実行できるようにしたため、他のも channel でイベントとして定義するようにすれば、処理を多重化することができて、この問題も解決します。
ここまで time.Sleep を用いて行ってきたインターバル処理については、 time.Tick() を用いるこでイベントにできます。 time.Tick(1 * time.Second) は想像通り、一秒毎に値を送ってくる channel を返します。
for { interval := time.Tick(1 * time.Second) // インターバル select { case done := <-p.closing: // snip case <-interval: // Get を呼ぶだけ body, err := Get(p.url) // err の方は単なる代入
標準パッケージの色々な関数は、こうした仕様を想定して channel を返すものが結構あります。
特に time.After(), time.Tick は特定のタイミングになったことを channel で伝えるようになっているため、この使い方にフィットします。
バッファリング
現在の実装では、 loop の中で p.result に取得した値を書き込んでいるためここでブロックしてしまいます。
呼出側がこれを読み込まない限りは、次の GET は発生しませんし、 Close() することもできません。
そこで、 GET で取得した結果はバッファに貯めて、クライアントへの通知とは別にしてみたいと思います。ここでバッファには単純なスライスを使います。
// 定期的な取得をするループ func (p *Pooling) Loop() { var buffer []string for { // snip case <-interval: // Get を呼ぶだけ body, err := Get(p.url) // err の方は単なる代入 // buffer に結果を追加 buffer = append(buffer, body) case p.result <- buffer[0]: buffer = buffer[1:] } } }
buffer からの取り出しは channel への書き込みになっています。
こうすると、クライアントが読み出すタイミングで順番に送ることができます。
しかし、この単純な実装だとバッファが空のタイミングで読み出されると、 index out of range で panic になります。
panic を rescue することも出きると思いますが、ここでは nil channel パターンを用いて解決してみます。
nil channel パターン
nil channel とは単純に channel の変数が nil になっている状態です。
この状態で select の case で read/write すると、ブロックされます。 case に channel を置く場合は read/write できたタイミングでヒットするため、ブロックしたままヒットしないということはその case が実質無視されるようなイメージになります(panic にはならないのがポイントです)
つまり、 select の前に channel をセットアップする段階で、実行したくない case があったらそこを nil にすればよく、実行したい場合は適切な channel で初期化しておけばよいです。
今回の場合は、スライスの中身で分岐したいので以下のようになります。
for { var first string var result chan string if len(buffer) > 0 { first = buffer[0] result = p.result // ここを通らないと nil } select { case <-interval: body, err := Get(p.url) // err の方は単なる代入 // buffer に結果を追加 buffer = append(buffer, body) case result <- first: // result が nil だと実行されない buffer = buffer[1:] } }
もし buffer が空だと、 result が nil なので result <- first の部分がブロックし、panic にはなりません。
buffer 肥大対応
buffer スライスを導入したために、クライアントの読み出しが遅かった場合は buffer に値がたまりつづけてメモリを食い尽くすリスクが出ます。
そこで、 buffer の最大値を決めておき、それを越えた場合は GET しないように変更します。
これも nil channel パターンをつかうことで解決できます。
const maxBuffer = 10 for { var first string var result chan string if len(buffer) > 0 { first = buffer[0] result = p.result // ここを通らないと nil } var interval <-chan time.Time if len(buffer) < maxBuffer { interval = time.After(1 * time.Second) // インターバル } select { case <-interval: // Get を呼ぶだけ body, err := Get(p.url) // err の方は単なる代入 // snip buffer = append(buffer, body) case result <- first: // result が nil だと実行されない buffer = buffer[1:] } }
buffer のサイズが maxBuffer を越えない場合は、 interval が nil のまま設定されないため、 Get() を呼び出す部分の case が実行されません。
Get の非同期化
Get() は同期ネットワーク I/O を伴います。
取得に時間がかかるとそこでブロックしてしまうため、これを非同期にすることでブロックしないようにすることが望ましいです。
そこで、 Get を goroutine 内で実行し channel 経由で結果を返すようにします。
返した結果は別の select で受信します。受信自体は一度に 1 つにするため channel には buffer を 1 と指定し、ここまでと同様 nil channel パターンを使います。
まず Get の結果を詰め込む構造体を定義します。
type getResponse struct { body string err error }
次に、それを渡す chennel です。この channel が使われていない時だけ interval を回し、 Get を goroutine で実行します。
取得結果の処理は別の case になります。
// 定期的な取得をするループ func (p *Pooling) Loop() { var response chan getResponse for { var interval <-chan time.Time if response == nil && len(buffer) < maxBuffer { interval = time.After(1 * time.Second) // インターバル } select { case <-interval: response = make(chan getResponse, 1) go func() { body, err := Get(p.url) // err の方は単なる代入 response <- getResponse{body, err} }() case res := <-response: response = nil // 次のループのために nil に戻す // buffer に結果を追加 buffer = append(buffer, res.body) case result <- first: // result が nil だと実行されない buffer = buffer[1:] } } }
こうして、最初にあげた問題はすべて解決しました。
nil channel によってループの中での実行タイミングを制御していることがわかります。
まとめ
Google I/O 2012 の Go Concurrency Patterns by rob pike では、 goroutine や channel の強力さなどが重点的に説明されていましたが、今回の発表では Select-Loop の強力さにフォーカスしており、非常に実践的な内容でした。
今回できたようなパターンを使うと、処理を非同期にし、メッセージングによってレースコンディションを無くした並行処理が実装ができるようになります。
Select で channel を read/write するのは、結局は イベントループとイベントドリブンなプログラミングパラダイムを自分で実装しているようなイメージで、 Node.js なんかでやっているのと基本的には同じです。
今後は Node.js でのイベントドリブンのノウハウ(Stream とか)も踏まえて、 Goroutine をより上手く扱えるように研究を重ねたいと思います。
Go1.1 の Race Detector
Race Detector
この機能は、簡単に言うと「レースコンディションが発生していないか」を調べる機能です。
といわれると、なんだかすごい機能ですね。
そもそもレースコンディションとは、マルチスレッドプログラミングなどで、単一のリソースを複数のスレッドで共有した際に、競合状態が発生して、予期しない結果を生んだりする状態です。
レースコンディションによるバグは、再現生が低かったりするので、一般的にデバッグが難しいとされています。
そうした状態が起こらないように、がっちりロックを取り合ったり、そもそもメモリを共有せずメッセージパッシングするなど、別のパラダイムで情報を共有する方法が取られます。
Go も、以下のように「メモリ共有より、メッセージ共有」という方針を推奨してます。
"Do not communicate by sharing memory; instead, share memory by communicating"
そこで使われるのが Goroutine と channel で、それについては Go の並行処理 - Block Rockin’ Codes でも書きました。
で、今回新しく入った機能は、そのレースコンディションが発生していないかどうかを、コンパイル時に Race Detector を仕込んでおくことで、実行時に調べることができるようです。
やっぱり、なんだかすごい機能ですね。
レースコンディションの例
Go の場合、複数の Goroutine から同一のメモリを変更するような場面がそれにあたります。
一番単純なのは、以下のような例でしょうか。
package main import ( "fmt" "time" ) func main() { num := 10 go func() { num += 1 fmt.Println(num) }() num += 1 fmt.Println(num) time.Sleep(time.Second) }
main 自体も Goroutine なので、 num は main の Goroutine と、その中の匿名関数の Goroutine との間で共有されていて、両方が変更と出力を行なっています。
普通に実行すると、以下のとおりです。
$ go run test.go 11 12
まあ、そうでしょう。
-race オプション
では race detection してみます。
実行やビルドのコマンドに -race オプションを足すだけです。
$ go test -race パッケージ $ go run -race ファイル $ go build -race パッケージ $ go install -race パッケージ
先ほどのを実行してみると。
$ go run -race test.go 11 ================== WARNING: DATA RACE Write by goroutine 4: main.func·001() /private/tmp/race/test.go:11 +0x37 gosched0() /usr/local/go/src/pkg/runtime/proc.c:1218 +0x9f Previous write by goroutine 1: main.main() /private/tmp/race/test.go:14 +0xb5 runtime.main() /usr/local/go/src/pkg/runtime/proc.c:182 +0x91 Goroutine 4 (running) created at: main.main() /private/tmp/race/test.go:13 +0xa5 runtime.main() /usr/local/go/src/pkg/runtime/proc.c:182 +0x91 Goroutine 1 (running) created at: _rt0_amd64() /usr/local/go/src/pkg/runtime/asm_amd64.s:87 +0x106 ================== 12 Found 1 data race(s) exit status 66
標準出力には結果、エラー出力にこのトレースログがでます。(上は一緒にしてます。)
ざっくりみてみると、
- 13 行目で生成された Goroutine 4 が 11 行目で
- main メソッドである Goroutine 1 が 14 行目で
それぞれ同じ領域に書き込みしてるというように読み取れます。
解消方法は色々ありますが、とりあえず以下だとレースコンディション自体はなくなります。
func main() { num := 10 go func(num int) { num += 1 fmt.Println(num) }(num) num += 1 fmt.Println(num) time.Sleep(time.Second) }
$ go run -race test.go 11 11
ログは出ません。
どうやら、 Goroutine ごとのメモリアクセスの履歴をもとに、競合が起こってないかをしらべているようです。よくできてますねぇ。すごい。
オプション
GORACE という環境変数を通じてオプションを渡せます。
- log_path (default stderr): ログファイルのパス(ファイル名は path.pid)。デフォルトは stderr
- exitcode (default 66): race があった時終わらせるときのステータスコード
- strip_path_prefix (default ""): 指定したパスをログから消せる
- history_size (default 1): Goroutine のメモリアクセス履歴を 32K * 2**history_size で調節
こんな感じ
$ GORACE="log_path=/tmp/race history_size=4" go run -race test.go
注意点
現在は darwin/amd64, linux/amd64, and windows/amd64 のみの対応。
実行時のメモリ使用量が 5-10倍, 実行時間が 2-20倍に 増えるようです。
テストで確認しておいて、ビルド時は外すとかの方がよさそうですね。
まとめ
メッセージパッシングに寄せて Goroutine + Channel でやっていたとしても、レースコンディションが起こってしまうこともあるかもしれません。
そして、レースコンディションは、「起こってない」ことを調べるのは一般的には難しいと思うんですが、その検出を言語のコアに取り込んでいるというのが Go らしくていいと思います。
テストでは必ずつけて、みつけたら channel に書き換えるというような癖をつけると、幸せになれるのかもしれません。
Go の並行処理
intro
先日の Go のカンファレンス GoCon で、 Go の並行処理周りについて発表させて頂きました。
Go Conference 2013 spring - connpass
具体的には Goroutine や Channel の話ですが、これらの機能は結構面白くて、いじって遊んでるだけでもわくわくします。
Go の並行処理は、設計方針がわりと特殊だと思うのですが、設計がシンプルなので分かるとそこまで難しくはないです。
(使いこなすのは、経験が必要そうですが)
今回話すにあたって色々調べましたが、発表時間の都合上省いたものもあるし、質疑応答で聞かれて応えられなかったこともあるので、
ここでまとめて置こうと思います。
発表資料
今回の発表資料はこちらです。
このブログの内容は、これをベースにします。
http://jxck.node-ninja.com/slides/gocon-2013spring.html
ソースは、ここにまとめました。
Go の Concurrency Model
並行処理プログラミングには、ざっくり分けて二つのアプローチがあります。
- Shared-memory communication
- ワーカ間でメモリ(リソース) を共有する。レースコンディションが起こらないようにロックをとることが多く、その実装は難しいことが多いとされる。
- Message-passing communication
- ワーカ間でメッセージパッシングを行う。Erlang などに実装された Actor モデルなどが代表的な実装。
そして、Go の並行処理モデルの方針は下記に宣言されています。
"Do not communicate by sharing memory; instead, share memory by communicating"
なので、上記で言うと後者なのですが、 Erlang の Actor モデルなどとは異なる点がいくつかあります。
Go での実装には、下記の二つが参考にされています。
- CSP (Communicating Sequential Processes)
- 並行処理のための設計理論。実装としては Occam, Limbo などがある。
- π caluculus
- こちらも並行処理のための理論。Erlang のメッセージングが ! なのはこれが元になってるよう。日本語の情報ならここA Very Brief Introduction to the Pi-Calculus (in Japanese)。
これらを実現するために、 Go には以下の機能が実装されています。
- goroutine
- channel
また、以下の二つもそれをより強力にします。
- select
- closure
Goroutine
Goroutine とは
- Coroutine ではない。
- Thread, Process でもない。
- 複数の Thread 上に多重化されて実装されてる。
- main() 自身や Scavenger(GC) などランタイムも goroutine を使ってる。
上記周りの話は、自分のあとの methane さんの発表の方が詳しかったので、そちらを参照して下さい。
https://gist.github.com/methane/5377227#file-goscheduler-md
go 文での起動
goroutine は go 文で関数を実行すると起動できます。
go 文はブロックしないので、 goroutine は非同期に起動されます。
下記の main() では、goroutine を二つ起動してから time.Sleep() で一時停止しているのは、これが無かったら f() が実行される前に main() が終わってしまうからです。
goroutine が他に実行されていようと main() が終わるとプロセスが終了します。
func f(msg string) { log.Println(msg) } func main() { go f("hello") go func() { log.Println("gopher") }() time.Sleep(time.Second) }
https://github.com/Jxck/goroutine-sample/blob/master/goroutine.go
goroutine の終了条件
goroutine は、下記の条件で終了します。
- 関数が終わる
- return で抜ける
- runtime.Goexit() を実行する
func main() { go func() { log.Println("end") }() go func() { log.Println("return") return }() go func() { log.Println("exit") runtime.Goexit() }() time.Sleep(time.Second) }
https://github.com/Jxck/goroutine-sample/blob/master/goroutine-exit.go
runtime.NumGoroutine
現在起動している goroutine の数を知ることができます。
func main() {
log.Println(runtime.NumGoroutine())
}
これを実行すると、自分の環境では 2 と出ました。
まず、 main() そのものが goroutine なので、 1 つはわかります。
しかし、もう 1 つは誰でしょう? 二つの関数を使ってますが、いずれも goroutine は使ってません。
ML で聞いてみたところ、 bradfitz さんが応えてくれました。こんな大物の返信がもらえるとか、、
結果から言うと、下記に変えると分かります。
func main() { log.Println(runtime.NumGoroutine()) select{} }
これを GOTRACEBACK=2 をつけて実行します。
GOTRACEBACK=2 go run numgoroutine.go
出力が長いのでのせませんが、見ると runtiem.main() 以外に runtime.MHeap_Scavenger() が動いてる事がわかります。
このように、 go 文で起動した goroutine や main() 以外にも、 Scavenger (GC) などランタイム環境でも goroutine が使われていることがわかります。
この情報は、 runtime.Stack からも取れます。
func main() { log.Println(runtime.NumGoroutine()) buf := make([]byte, 1<<20) buf = buf[:runtime.Stack(buf, true)] log.Println(string(buf)) select {} }
Channel
Channel とは
- Channel は goroutine 間でのメッセージパッシングをするためのもの
- メッセージの型を指定できる
- first class value であり、引数や戻り値にも使える
- send/receive でブロックする
- buffer で、一度に扱えるメッセージ量を指定できる
Channel を用いたメッセージング
Channel は参照型なので make() でインスタンスを生成します。
使い方は簡単で、送信が "channel<-value" で受信が "<-channel" です。
下記は、 goroutine と main() 間でのメッセージングです。
main() の最後では受信結果を出力しており、その受信で値が届くまでブロックするので、 time.Sleep() は必要無い点に注意して下さい。
func f(ch chan bool) { ch <- true } func main() { ch := make(chan bool) go f(ch) log.Println(<-ch) // ここでデータが来るまでブロックする }
https://github.com/Jxck/goroutine-sample/blob/master/channel.go
同期
channel は、送受信が完了するまでブロックします。
このことを、 goroutine 間の同期に応用することができます。
下記は、 main() が channel を受信する事で goroutine の終了を待つ同期をしています。
受信した値自体は必要ないため、捨てています。 goroutine は channel を closure で参照しています。closure があると本当に便利ですね。
func main() { fin := make(chan bool) go func() { log.Println("worker working..") fin <- false }() <-fin }
https://github.com/Jxck/goroutine-sample/blob/master/finchannel.go
上記は、 goroutine が 1 つですが、複数ある場合は数を管理しないといけなくなります。
それを行う場合は sync.WaitGroup というモジュールが使用できます。
func main() { var wg sync.WaitGroup for i:=0; i<3; i++ { wg.Add(1) // goroutine を生成するたびインクリメント go func(i int) { log.Println(i) wg.Done() // 終了時にデクリメント }(i) } wg.Wait() // ブロックし、全ての Done が終わったら次に進む }
https://github.com/Jxck/goroutine-sample/blob/master/waitGroup.go
sync.WaitGroup は channel を使っているわけではないようです。
sync パッケージには、 lock をとったりするパッケージがるので、 channel を使わずそれでリソース共有/同期をすることもできますが、最初に述べたように Go では極力メッセージングでリソース共有/同期をしましょう。
Worker の起動
下記は、ワーカを 3 つ起動し、それぞれの処理結果を main() で受け取っています。
worker() を go 文で実行し、結果を渡してもらうための channel を渡すのではなく、 worker が内部で、結果を渡すための channel を生成しそれを返しているので、 main() はそこから取り出しています。
func worker(msg string) <-chan string { receiver := make(chan string) for i := 0; i < 3; i++ { go func(i int) { msg := fmt.Sprintf("%d %s done", i, msg) receiver <- msg }(i) } return receiver } func main() { receiver := worker("job") for i := 0; i < 3; i++ { log.Println(<-receiver) } }
https://github.com/Jxck/goroutine-sample/blob/master/workers.go
worker の宣言に注目して下さい。
// func worker(msg string) chan string こうではない func worker(msg string) <-chan string
worker が返す型は「読み取り専用の channel」です。(<- がついてる)
これにより、 main() がこの channel に誤ってデータを書き込むことを防ぎます。
複数の channel と select
先の例では、 worker は 3 つのメッセージを返すことが予め分かっていたので、 3 つだけ取り出していました。
しかし、 worker が予めわからない場合などもあります。
その場合は、用途の違う別の channel を返すことで、必要なメッセージをとることもできます。
Go では、関数が複数の値を返すことができるため、以下の例は worker が終わったことを返すための channel を worker が返すようにしています。
main() では、複数のチャネルからのメッセージを受信する必要がありますが、それぞれブロックしてしまいます。そんな場合は、 select という構文を使うと、複数の channel の受信を同時に行うことができます。
func worker(msg string) (<-chan string, <-chan bool) { var wg sync.WaitGroup receiver := make(chan string) fin := make(chan bool) go func() { for i := 0; i < 3; i++ { wg.Add(1) go func(i int) { msg := fmt.Sprintf("%d %s done", i, msg) receiver <- msg wg.Done() }(i) } wg.Wait() fin<-false // 終了を伝える }() return receiver, fin } func main() { receiver, fin := worker("job") for { select { case receive := <-receiver: log.Println(receive) case <-fin: // 終了したら終わる return } } }
https://github.com/Jxck/goroutine-sample/blob/master/finchannel.go
Channel の close()
close() は組み込みの関数で、用の済んだ channel を閉じることができます。
そもそも channel の呼び出しは 2 つの値が受け取れます。
message, ok := <-channel
この 2 つめの ok は、 channel が閉じられているかを表す bool です。
ok は、取っても取らなくても良い仕様になっています。
channel を close() した場合、受信側には 空のメッセージと false が渡るので、これを使って channel が close() されたことを判定できます。
先ほどの例を close を使って書き直すと以下になります。
func worker(msg string) (<-chan string) { var wg sync.WaitGroup receiver := make(chan string) go func() { for i := 0; i < 3; i++ { wg.Add(1) go func(i int) { msg := fmt.Sprintf("%d %s done", i, msg) receiver <- msg wg.Done() }(i) } wg.Wait() close(receiver) }() return receiver } func main() { receiver := worker("job") for { receive, ok := <-receiver if !ok { log.Println("closed") return } log.Println(receive) } }
https://github.com/Jxck/goroutine-sample/blob/master/close.go
timeout
重たい worker がいた場合、 worker からの終了通知を受け取らずに、一定時間経過したら終わりたい場合もあります。
そんな時は、 time.After という関数を使うことができます。
time.After の型は以下です。
func After(d Duration) <-chan Time
一定時間経過したらメッセージを送る channel を返すので、この channel を受け取って受信をしておけば、一定時間後に処理をするためのトリガーにできます。
これを用いて 1 秒後に timeout するように書き換えてみます。
func randomTime() time.Duration { return time.Duration(rand.Intn(1e3)) * time.Millisecond } func worker(msg string) <-chan string { receiver := make(chan string) for i := 0; i < 300; i++ { go func(i int) { time.Sleep(randomTime()) msg := fmt.Sprintf("%d %s done", i, msg) receiver <- msg }(i) } return receiver } func main() { receiver := worker("job") for { select { case receive := <-receiver: log.Println(receive) case <-time.After(time.Second): // 一定時間経過したら受信 log.Println("timeout") return // 受信時に終われば timeout 処理になる。 } } }
https://github.com/Jxck/goroutine-sample/blob/master/timeout.go
Buffer
Channel は、 make() 時に buffer を指定することができます。
この buffer の値は、一度に channel に書き込める message の上限値になります。
デフォルトは 0 です。指定すると、 MQ のように扱うことができるイメージです。
送信は buffer が一杯だった場合は送信でブロックします。
これを利用して、例えば worker が同時に起動する数を制限できます。
以下は、 100 個の処理を同時に 5 つだけの goroutine を起動して行うサンプルです。
func worker(msg string) <-chan string { limit := make(chan int, 5) receiver := make(chan string) go func() { for i := 0; i < 100; i++ { log.Println(runtime.NumGoroutine()) limit <- 1 go func(i int) { msg := fmt.Sprintf("%d %s done", i, msg) receiver <- msg <-limit }(i) } }() return receiver } func main() { receiver := worker("job") for i := 0; i < 100; i++ { log.Println(<-receiver) } }
https://github.com/Jxck/goroutine-sample/blob/master/buffer.go
runtime.NumGoroutine() の結果は必ずしも 5 にならない点は前述の通り。
Pattern 編
よくあるパターンを、ここまでの内容を使って実装してみます。
coroutine
coroutine と言えば Lua でしょう。
スケジュールを記述することで、処理を途中で止めて、そこからリスタートすることができます。
単純な例を Lua で書くとこんな感じです。
f() の中の処理を途中で中断し、そこから再開している例です。
function f() coroutine.yield "one" coroutine.yield "two" coroutine.yield "three" return end local co = coroutine.wrap (f) print (co ()) -- one print (co ()) -- two print (co ()) -- three
https://github.com/Jxck/goroutine-sample/blob/master/coroutine.lua
Go では channel がブロックすることを利用します。
func f(yield chan string) { yield <- "one" yield <- "two" yield <- "three" } func main() { co := make(chan string) go f(co) log.Println(<-co) // one log.Println(<-co) // two log.Println(<-co) // three }
https://github.com/Jxck/goroutine-sample/blob/master/coroutine.go
generator
generator は、多くの場合配列のように扱えますが、扱う値が実行時に生成されている点が配列と違います。予め値を生成しないことにより、メモリ消費が少ないのが特徴です。
10 まで値を取り出せる generator は、 python だと以下のようになります。
def generator(n): i = 0 while True: if i > n: break yield i i += 1 for i in generator(10): print i
https://github.com/Jxck/goroutine-sample/blob/master/generator.py
Go ではやはり、 channel のブロックを使います。
func generator(n int) chan int { ch := make(chan int) i := 0 go func() { for { ch <- i i++ if i > n { close(ch) break } } }() return ch } func main() { for x := range generator(10) { log.Println(x) } }
https://github.com/Jxck/goroutine-sample/blob/master/generator.go
make() の部分で buffer を指定していないので、同時に 1 つしか生成されないことが保証できます。(ということは生成する数を指定することもできる)
その他
multi-core
現時点では、 goroutine がマルチコアを自動的に使いこなすような最適化はされないようです。
もし、 goroutine を複数のコアで実行したい場合は GOMAXPROC 環境変数か、 runtime.GOMAXPROCS() にコア数を指定します。
よって、ソースに以下のように書くことが多いです。
cpus := runtime.NumCPU() runtime.GOMAXPROCS(cpus)
ベンチ
goroutine のメモリ使用量を調べます。
こちらにあったのをお借りして、少し変えてみました。
https://gist.github.com/jgrahamc/5253020
func main() { cpus := runtime.NumCPU() runtime.GOMAXPROCS(cpus) count := 1000 * 100 var startMemory runtime.MemStats runtime.ReadMemStats(&startMemory) start := time.Now() fin := make(chan bool) for i := 0; i < count; i++ { go func() { <-fin }() } elapsed := time.Since(start) var endMemory runtime.MemStats runtime.ReadMemStats(&endMemory) close(fin) fmt.Printf(` goroutine: %d cpu: %d time: %f memory all: %f MB memory: %f byte/goroutine `, count, cpus, elapsed.Seconds(), float64(endMemory.Alloc-startMemory.Alloc)/float64(1024*1024), float64(endMemory.Alloc - startMemory.Alloc)/float64(count)) }
https://github.com/Jxck/goroutine-sample/blob/master/goroutine-bench.go
100,000 個の goroutine を起動して、その時間とメモリの使用量を見ています。
手元の Mac OSX Lione Core 2 Duo, Memory 4G で実行してみた結果が以下です。
goroutine: 100000 cpu: 2 time: 0.589367 memory all: 23.001915 MB memory: 241.192560 byte/goroutine
かなり、小さい事がわかりますね。
というか不安なのでもう少し調べてみます(汗)
outro
goroutine と channel 自体の仕様はそんなに難しいものではありませんが、 2 つをを使って、かなり色々表現できることが分かって頂けたと思います。また、 select や closure といった機能が地味に協力なので、組み合わせるとさらに色々できるようになります。
と、ここまでが発表の範囲でしたがまだいくつかあるので、後で追記していきます。
go の売りの 1 つでもある、この組み込みの並行処理機能の使い方がなんとなくでも伝わればと思います。
参考
- http://talks.golang.org/2012/concurrency.slide
- http://www.slideshare.net/jgrahamc/go-oncurrency
- https://dl.google.com/googleio/2010/tech-talks-go-programming.pdf
- https://sites.google.com/site/gopatterns/
- http://www.softwareresearch.net/fileadmin/src/docs/teaching/SS10/Sem/Paper__aigner_baumgartner.pdf