Block Rockin’ Codes

back with another one of those block rockin' codes

Advanced Go Concurrency 3 つのパターン

intro

ちょっと時間が経ってしまいましたが、 Go研 vol.03 では、 Google I/O 2013 で行われてた Go のセッションの 1 つである下記をテーマに研究しました。


Advanced Go Concurrency Patterns


資料は以下です。


https://github.com/goken/goken/blob/master/goken03/goken03.md


また、ここから順に実装しながら解説をしますが、その完成品はこちらにあります。
(commit 履歴も、本記事にある程度沿っています。)


https://gist.github.com/Jxck/5831269


スライドにそってやったのですが、セッションの内容は結構重ためだったので、 2 時間の Go 研だとちょっと消化不良ぎみだったのが反省点です。


そこで、このセッションの要である、並行処理に関する 3 つのパターンについて紹介します。

メインテーマ

このセッションにおいて一番重要だったのは、 Select の使い方です。
セッションでは、 RSS Reader をテーマとしていますが、その内容をデフォルメして、以下のような古典的 pooling 処理を考えてみます。


1. loop を回して、定期的に外部サーバに GET を投げる
2. 取得した結果を、 channel を経由して呼び出し側に渡す
3. 呼び出し側から Close を呼ばれたら終了処理をする


Loop の部分は、ナイーブな実装は下記のようになるでしょう。

// 定期的な取得をするループ
func (p *Pooling) Loop() {
	for {
		// Close() が呼ばれいるかを調べる
		if p.closed {
			close(p.result)
			return
		}
		// Get を呼ぶだけ
		body, err := Get(p.url)
		if err != nil {
			// エラーがあったら、 3 秒間停止して再開
			p.err = err
			time.Sleep(3 * time.Second)
			fmt.Println("failed")
			continue
		}
		// 結果を channel で渡す
		p.result <- body
		time.Sleep(1 * time.Second)
	}
}


しかし、この実装にはいくつかの問題があります。

問題点

この loop の実装では以下の問題があります。

1. p.closed, p.err が race condition (複数の Goroutine が lock なしでアクセス)
2. Close() を実行しても Sleep() が長いとすぐには loop が終わらない
3. Close() 後に result の読み出しを止められると、 loop が永遠に終わらない
4. Get() が同期処理でブロックする


これらを解決するためには、基本的な方針として Select を用いたイベント駆動を導入します。 for-loop 内で select を使用する基本的な構造は以下になります。

func (p *Pooling) Loop() {
	// (1) mutable な変数の定義
	for {
		// (2) case のための channel を定義
		select {
		case <-c1: // イベントの補足
			// read/write
		case c2 <- x: // 書き込み
			// read/write
		case y := <-c3: // 値の受け取り
			// read/write
		}
	}
}

この構造は、 Go の並行処理プログラミングでは基本になります。
とくに (1), (2) と書いた領域の使い分けが大事だと学びました。

request/response パターン

Close() を channel を用いた形で実装しなおします。

トリガーにするための適当な channel を作ります。
ここでは closing という bool の channelにしています。

type Pooling struct {
	url     string
	result  chan string
	closing chan bool
	err     error
}

Close() はそこに何か送ればいいです。
といっても実際に何か送りたいデータがある訳ではないので close() してしまいます。
(close() すると ゼロ値の false が送られる)

// loop を終了させる
func (p *Pooling) Close() error {
	close(p.closing)
	return p.err
}

loop 側は case で補足したら終了処理をします。

func (p *Pooling) Loop() {
	for {
		select {
		case <-p.closing:
			close(p.result)
			return

これで p.closed へのアクセスは無くなりました。
しかし、これだと error がとれません。これは loop 側からメッセージで返して欲しいですね。

channel の channel

そこで、トリガとして作った closing を、 channel の channel にします。

type Pooling struct {
	url     string
	result  chan string
	closing chan chan error
}

"error を送る channel" を送ることができる channel です。


Close() は、error を受けとるための channel を作って closing に送ります。

そして、 closing 経由でエラーが返ってくるのを待ちます。読み出してブロックするので、実際に終了処理が終わるまでは Close() は終わりません。また読み出した値をそのまま返せばシグネチャはそのままです。

// loop を終了させる
func (p *Pooling) Close() error {
	done := make(chan error)
	p.closing <- done
	return <-done
}

loop の方では、 Get() でエラーが発生したらそれを保持しておき、 Close() の(p.closing から値を受け取った)タイミングでそのエラーを送信します。
p.closing から受け取っている done は error 型の channel ななので、ここに送信するだけです。
(実際これだと最後の Get() で発生したエラーしか送れない気がするけど、今回の元ネタの方もそうなっていて、説明の都合上この方が都合がいいので細かい事は気にしない方向で)

// 定期的な取得をするループ
func (p *Pooling) Loop() {
	// Get が失敗したらこれに入れる
	var err error
	for {
		select {
		case done := <-p.closing:
			close(p.result)
			done <- err
			return
		default:
			// Get を呼ぶだけ
			body, err := Get(p.url)

err は Loop() のローカルとして宣言しており、 Goroutine をまたいでいません。
別の Goroutine への送信は channel 経由のメッセージングを使っているので race condition はありません。
(ちなみに、 Get の結果の代入で body は新しい変数ですが、 err は再宣言になっているように思いました。実際これはエラーになりません。詳細はこちら。 http://qiita.com/Jxck_/items/2616abafea89ee97c477)

こうして、 p.closed, p.err で起こっていた race condition は解消されました。

interval channel

Close を channel 経由で実行できるようにしたため、他のも channel でイベントとして定義するようにすれば、処理を多重化することができて、この問題も解決します。


ここまで time.Sleep を用いて行ってきたインターバル処理については、 time.Tick() を用いるこでイベントにできます。 time.Tick(1 * time.Second) は想像通り、一秒毎に値を送ってくる channel を返します。

for {
	interval := time.Tick(1 * time.Second) // インターバル
	select {
	case done := <-p.closing:
		// snip
	case <-interval:
		// Get を呼ぶだけ
		body, err := Get(p.url) // err の方は単なる代入

標準パッケージの色々な関数は、こうした仕様を想定して channel を返すものが結構あります。
特に time.After(), time.Tick は特定のタイミングになったことを channel で伝えるようになっているため、この使い方にフィットします。

バッファリング

現在の実装では、 loop の中で p.result に取得した値を書き込んでいるためここでブロックしてしまいます。
呼出側がこれを読み込まない限りは、次の GET は発生しませんし、 Close() することもできません。


そこで、 GET で取得した結果はバッファに貯めて、クライアントへの通知とは別にしてみたいと思います。ここでバッファには単純なスライスを使います。

// 定期的な取得をするループ
func (p *Pooling) Loop() {
	var buffer []string
	for {
		// snip
		case <-interval:
			// Get を呼ぶだけ
			body, err := Get(p.url) // err の方は単なる代入
			// buffer に結果を追加
			buffer = append(buffer, body)
		case p.result <- buffer[0]:
			buffer = buffer[1:]
		}
	}
}

buffer からの取り出しは channel への書き込みになっています。
こうすると、クライアントが読み出すタイミングで順番に送ることができます。


しかし、この単純な実装だとバッファが空のタイミングで読み出されると、 index out of range で panic になります。
panic を rescue することも出きると思いますが、ここでは nil channel パターンを用いて解決してみます。

nil channel パターン

nil channel とは単純に channel の変数が nil になっている状態です。
この状態で select の case で read/write すると、ブロックされます。 case に channel を置く場合は read/write できたタイミングでヒットするため、ブロックしたままヒットしないということはその case が実質無視されるようなイメージになります(panic にはならないのがポイントです)
つまり、 select の前に channel をセットアップする段階で、実行したくない case があったらそこを nil にすればよく、実行したい場合は適切な channel で初期化しておけばよいです。


今回の場合は、スライスの中身で分岐したいので以下のようになります。

for {
	var first string
	var result chan string
	if len(buffer) > 0 {
		first = buffer[0]
		result = p.result // ここを通らないと nil
	}
	select {
	case <-interval:
		body, err := Get(p.url) // err の方は単なる代入
		// buffer に結果を追加
		buffer = append(buffer, body)
	case result <- first: // result が nil だと実行されない
		buffer = buffer[1:]
	}
}

もし buffer が空だと、 result が nil なので result <- first の部分がブロックし、panic にはなりません。

buffer 肥大対応

buffer スライスを導入したために、クライアントの読み出しが遅かった場合は buffer に値がたまりつづけてメモリを食い尽くすリスクが出ます。

そこで、 buffer の最大値を決めておき、それを越えた場合は GET しないように変更します。
これも nil channel パターンをつかうことで解決できます。

const maxBuffer = 10
for {
	var first string
	var result chan string
	if len(buffer) > 0 {
		first = buffer[0]
		result = p.result // ここを通らないと nil
	}
	var interval <-chan time.Time
	if len(buffer) < maxBuffer {
		interval = time.After(1 * time.Second) // インターバル
	}
	select {
	case <-interval:
		// Get を呼ぶだけ
		body, err := Get(p.url) // err の方は単なる代入
		// snip
		buffer = append(buffer, body)
	case result <- first: // result が nil だと実行されない
		buffer = buffer[1:]
	}
}

buffer のサイズが maxBuffer を越えない場合は、 interval が nil のまま設定されないため、 Get() を呼び出す部分の case が実行されません。

Get の非同期化

Get() は同期ネットワーク I/O を伴います。
取得に時間がかかるとそこでブロックしてしまうため、これを非同期にすることでブロックしないようにすることが望ましいです。

そこで、 Get を goroutine 内で実行し channel 経由で結果を返すようにします。
返した結果は別の select で受信します。受信自体は一度に 1 つにするため channel には buffer を 1 と指定し、ここまでと同様 nil channel パターンを使います。


まず Get の結果を詰め込む構造体を定義します。

type getResponse struct {
	body string
	err  error
}

次に、それを渡す chennel です。この channel が使われていない時だけ interval を回し、 Get を goroutine で実行します。
取得結果の処理は別の case になります。

// 定期的な取得をするループ
func (p *Pooling) Loop() {
	var response chan getResponse
	for {
		var interval <-chan time.Time
		if response == nil && len(buffer) < maxBuffer {
			interval = time.After(1 * time.Second) // インターバル
		}
		select {
		case <-interval:
			response = make(chan getResponse, 1)
			go func() {
				body, err := Get(p.url) // err の方は単なる代入
				response <- getResponse{body, err}
			}()
		case res := <-response:
			response = nil // 次のループのために nil に戻す
			// buffer に結果を追加
			buffer = append(buffer, res.body)
		case result <- first: // result が nil だと実行されない
			buffer = buffer[1:]
		}
	}
}

こうして、最初にあげた問題はすべて解決しました。
nil channel によってループの中での実行タイミングを制御していることがわかります。

まとめ

Google I/O 2012 の Go Concurrency Patterns by rob pike では、 goroutine や channel の強力さなどが重点的に説明されていましたが、今回の発表では Select-Loop の強力さにフォーカスしており、非常に実践的な内容でした。


今回できたようなパターンを使うと、処理を非同期にし、メッセージングによってレースコンディションを無くした並行処理が実装ができるようになります。


Select で channel を read/write するのは、結局は イベントループとイベントドリブンなプログラミングパラダイムを自分で実装しているようなイメージで、 Node.js なんかでやっているのと基本的には同じです。
今後は Node.js でのイベントドリブンのノウハウ(Stream とか)も踏まえて、 Goroutine をより上手く扱えるように研究を重ねたいと思います。

Go1.1 の Race Detector

intro

先々週、Go 1.1 がリリースされました。

いくつか新しい機能が入ったのですが、その中の Race Detector というのが面白そうだったので、
軽く調べてみました。

Race Detector

この機能は、簡単に言うと「レースコンディションが発生していないか」を調べる機能です。
といわれると、なんだかすごい機能ですね。


そもそもレースコンディションとは、マルチスレッドプログラミングなどで、単一のリソースを複数のスレッドで共有した際に、競合状態が発生して、予期しない結果を生んだりする状態です。
レースコンディションによるバグは、再現生が低かったりするので、一般的にデバッグが難しいとされています。


そうした状態が起こらないように、がっちりロックを取り合ったり、そもそもメモリを共有せずメッセージパッシングするなど、別のパラダイムで情報を共有する方法が取られます。


Go も、以下のように「メモリ共有より、メッセージ共有」という方針を推奨してます。

"Do not communicate by sharing memory; instead, share memory by communicating"

そこで使われるのが Goroutine と channel で、それについては Go の並行処理 - Block Rockin’ Codes でも書きました。


で、今回新しく入った機能は、そのレースコンディションが発生していないかどうかを、コンパイル時に Race Detector を仕込んでおくことで、実行時に調べることができるようです。
やっぱり、なんだかすごい機能ですね。

レースコンディションの例

Go の場合、複数の Goroutine から同一のメモリを変更するような場面がそれにあたります。

一番単純なのは、以下のような例でしょうか。

package main

import (
	"fmt"
	"time"
)

func main() {
	num := 10
	go func() {
		num += 1
		fmt.Println(num)
	}()
	num += 1
	fmt.Println(num)
	time.Sleep(time.Second)
}


main 自体も Goroutine なので、 num は main の Goroutine と、その中の匿名関数の Goroutine との間で共有されていて、両方が変更と出力を行なっています。


普通に実行すると、以下のとおりです。

$ go run test.go
11
12


まあ、そうでしょう。

-race オプション

では race detection してみます。
実行やビルドのコマンドに -race オプションを足すだけです。

$ go test -race パッケージ
$ go run -race ファイル
$ go build -race パッケージ
$ go install -race パッケージ


先ほどのを実行してみると。

$ go run -race test.go
11
==================
WARNING: DATA RACE
Write by goroutine 4:
  main.func&#183;001()
      /private/tmp/race/test.go:11 +0x37
  gosched0()
      /usr/local/go/src/pkg/runtime/proc.c:1218 +0x9f

Previous write by goroutine 1:
  main.main()
      /private/tmp/race/test.go:14 +0xb5
  runtime.main()
      /usr/local/go/src/pkg/runtime/proc.c:182 +0x91

Goroutine 4 (running) created at:
  main.main()
      /private/tmp/race/test.go:13 +0xa5
  runtime.main()
      /usr/local/go/src/pkg/runtime/proc.c:182 +0x91

Goroutine 1 (running) created at:
  _rt0_amd64()
      /usr/local/go/src/pkg/runtime/asm_amd64.s:87 +0x106

==================
12
Found 1 data race(s)
exit status 66


標準出力には結果、エラー出力にこのトレースログがでます。(上は一緒にしてます。)

ざっくりみてみると、

  • 13 行目で生成された Goroutine 4 が 11 行目で
  • main メソッドである Goroutine 1 が 14 行目で

それぞれ同じ領域に書き込みしてるというように読み取れます。


解消方法は色々ありますが、とりあえず以下だとレースコンディション自体はなくなります。

func main() {
	num := 10
	go func(num int) {
		num += 1
		fmt.Println(num)
	}(num)
	num += 1
	fmt.Println(num)
	time.Sleep(time.Second)
}
$ go run -race test.go
11
11

ログは出ません。

どうやら、 Goroutine ごとのメモリアクセスの履歴をもとに、競合が起こってないかをしらべているようです。よくできてますねぇ。すごい。

オプション

GORACE という環境変数を通じてオプションを渡せます。

  • log_path (default stderr): ログファイルのパス(ファイル名は path.pid)。デフォルトは stderr
  • exitcode (default 66): race があった時終わらせるときのステータスコード
  • strip_path_prefix (default ""): 指定したパスをログから消せる
  • history_size (default 1): Goroutine のメモリアクセス履歴を 32K * 2**history_size で調節

こんな感じ

$ GORACE="log_path=/tmp/race history_size=4" go run -race test.go

注意点

現在は darwin/amd64, linux/amd64, and windows/amd64 のみの対応。
実行時のメモリ使用量が 5-10倍, 実行時間が 2-20倍に 増えるようです。

テストで確認しておいて、ビルド時は外すとかの方がよさそうですね。

まとめ

メッセージパッシングに寄せて Goroutine + Channel でやっていたとしても、レースコンディションが起こってしまうこともあるかもしれません。

そして、レースコンディションは、「起こってない」ことを調べるのは一般的には難しいと思うんですが、その検出を言語のコアに取り込んでいるというのが Go らしくていいと思います。

テストでは必ずつけて、みつけたら channel に書き換えるというような癖をつけると、幸せになれるのかもしれません。

Go の並行処理

intro

先日の Go のカンファレンス GoCon で、 Go の並行処理周りについて発表させて頂きました。


Go Conference 2013 spring - connpass


具体的には Goroutine や Channel の話ですが、これらの機能は結構面白くて、いじって遊んでるだけでもわくわくします。
Go の並行処理は、設計方針がわりと特殊だと思うのですが、設計がシンプルなので分かるとそこまで難しくはないです。
(使いこなすのは、経験が必要そうですが)


今回話すにあたって色々調べましたが、発表時間の都合上省いたものもあるし、質疑応答で聞かれて応えられなかったこともあるので、
ここでまとめて置こうと思います。

発表資料

今回の発表資料はこちらです。
このブログの内容は、これをベースにします。


http://jxck.node-ninja.com/slides/gocon-2013spring.html


ソースは、ここにまとめました。


https://github.com/Jxck/goroutine-sample

Go の Concurrency Model

並行処理プログラミングには、ざっくり分けて二つのアプローチがあります。

Shared-memory communication
ワーカ間でメモリ(リソース) を共有する。レースコンディションが起こらないようにロックをとることが多く、その実装は難しいことが多いとされる。
Message-passing communication
ワーカ間でメッセージパッシングを行う。Erlang などに実装された Actor モデルなどが代表的な実装。


そして、Go の並行処理モデルの方針は下記に宣言されています。

"Do not communicate by sharing memory; instead, share memory by communicating"


なので、上記で言うと後者なのですが、 Erlang の Actor モデルなどとは異なる点がいくつかあります。


Go での実装には、下記の二つが参考にされています。

CSP (Communicating Sequential Processes)
並行処理のための設計理論。実装としては Occam, Limbo などがある。
π caluculus
こちらも並行処理のための理論。Erlang のメッセージングが ! なのはこれが元になってるよう。日本語の情報ならここA Very Brief Introduction to the Pi-Calculus (in Japanese)


これらを実現するために、 Go には以下の機能が実装されています。

  • goroutine
  • channel


また、以下の二つもそれをより強力にします。

  • select
  • closure

Goroutine

Goroutine とは
  • Coroutine ではない。
  • Thread, Process でもない。
  • 複数の Thread 上に多重化されて実装されてる。
  • main() 自身や Scavenger(GC) などランタイムも goroutine を使ってる。


上記周りの話は、自分のあとの methane さんの発表の方が詳しかったので、そちらを参照して下さい。
https://gist.github.com/methane/5377227#file-goscheduler-md

go 文での起動

goroutine は go 文で関数を実行すると起動できます。
go 文はブロックしないので、 goroutine は非同期に起動されます。
下記の main() では、goroutine を二つ起動してから time.Sleep() で一時停止しているのは、これが無かったら f() が実行される前に main() が終わってしまうからです。


goroutine が他に実行されていようと main() が終わるとプロセスが終了します。

func f(msg string) {
	log.Println(msg)
}

func main() {
	go f("hello")
	go func() {
		log.Println("gopher")
	}()
	time.Sleep(time.Second)
}

https://github.com/Jxck/goroutine-sample/blob/master/goroutine.go

goroutine の終了条件

goroutine は、下記の条件で終了します。

  • 関数が終わる
  • return で抜ける
  • runtime.Goexit() を実行する
func main() {
	go func() {
		log.Println("end")
	}()
	go func() {
		log.Println("return")
		return
	}()
	go func() {
		log.Println("exit")
		runtime.Goexit()
	}()
	time.Sleep(time.Second)
}

https://github.com/Jxck/goroutine-sample/blob/master/goroutine-exit.go


runtime.NumGoroutine

現在起動している goroutine の数を知ることができます。

func main() {
	log.Println(runtime.NumGoroutine())
}

これを実行すると、自分の環境では 2 と出ました。


まず、 main() そのものが goroutine なので、 1 つはわかります。
しかし、もう 1 つは誰でしょう? 二つの関数を使ってますが、いずれも goroutine は使ってません。


ML で聞いてみたところ、 bradfitz さんが応えてくれました。こんな大物の返信がもらえるとか、、


結果から言うと、下記に変えると分かります。

func main() {
	log.Println(runtime.NumGoroutine())
	select{}
}

これを GOTRACEBACK=2 をつけて実行します。

GOTRACEBACK=2 go run numgoroutine.go

出力が長いのでのせませんが、見ると runtiem.main() 以外に runtime.MHeap_Scavenger() が動いてる事がわかります。


このように、 go 文で起動した goroutine や main() 以外にも、 Scavenger (GC) などランタイム環境でも goroutine が使われていることがわかります。


この情報は、 runtime.Stack からも取れます。

func main() {
	log.Println(runtime.NumGoroutine())
	buf := make([]byte, 1<<20)
	buf = buf[:runtime.Stack(buf, true)]
	log.Println(string(buf))
	select {}
}

https://github.com/Jxck/goroutine-sample/blob/master/who.go

Channel

Channel とは
  • Channel は goroutine 間でのメッセージパッシングをするためのもの
  • メッセージの型を指定できる
  • first class value であり、引数や戻り値にも使える
  • send/receive でブロックする
  • buffer で、一度に扱えるメッセージ量を指定できる
Channel を用いたメッセージング

Channel は参照型なので make() でインスタンスを生成します。
使い方は簡単で、送信が "channel<-value" で受信が "<-channel" です。


下記は、 goroutine と main() 間でのメッセージングです。
main() の最後では受信結果を出力しており、その受信で値が届くまでブロックするので、 time.Sleep() は必要無い点に注意して下さい。

func f(ch chan bool) {
	ch <- true
}

func main() {
	ch := make(chan bool)
	go f(ch)
	log.Println(<-ch) // ここでデータが来るまでブロックする
}

https://github.com/Jxck/goroutine-sample/blob/master/channel.go

同期

channel は、送受信が完了するまでブロックします。
このことを、 goroutine 間の同期に応用することができます。


下記は、 main() が channel を受信する事で goroutine の終了を待つ同期をしています。
受信した値自体は必要ないため、捨てています。 goroutine は channel を closure で参照しています。closure があると本当に便利ですね。

func main() {
	fin := make(chan bool)
	go func() {
		log.Println("worker working..")
		fin <- false
	}()
	<-fin
}

https://github.com/Jxck/goroutine-sample/blob/master/finchannel.go


上記は、 goroutine が 1 つですが、複数ある場合は数を管理しないといけなくなります。
それを行う場合は sync.WaitGroup というモジュールが使用できます。

func main() {
	var wg sync.WaitGroup
	for i:=0; i<3; i++ {
		wg.Add(1) // goroutine を生成するたびインクリメント
		go func(i int) {
			log.Println(i)
			wg.Done() // 終了時にデクリメント
		}(i)
	}
	wg.Wait() // ブロックし、全ての Done が終わったら次に進む
}

https://github.com/Jxck/goroutine-sample/blob/master/waitGroup.go


sync.WaitGroup は channel を使っているわけではないようです。
sync パッケージには、 lock をとったりするパッケージがるので、 channel を使わずそれでリソース共有/同期をすることもできますが、最初に述べたように Go では極力メッセージングでリソース共有/同期をしましょう。

Worker の起動

下記は、ワーカを 3 つ起動し、それぞれの処理結果を main() で受け取っています。
worker() を go 文で実行し、結果を渡してもらうための channel を渡すのではなく、 worker が内部で、結果を渡すための channel を生成しそれを返しているので、 main() はそこから取り出しています。

func worker(msg string) <-chan string {
	receiver := make(chan string)
	for i := 0; i < 3; i++ {
		go func(i int) {
			msg := fmt.Sprintf("%d %s done", i, msg)
			receiver <- msg
		}(i)
	}
	return receiver
}

func main() {
	receiver := worker("job")
	for i := 0; i < 3; i++ {
		log.Println(<-receiver)
	}
}

https://github.com/Jxck/goroutine-sample/blob/master/workers.go


worker の宣言に注目して下さい。

// func worker(msg string) chan string こうではない
func worker(msg string) <-chan string

worker が返す型は「読み取り専用の channel」です。(<- がついてる)
これにより、 main() がこの channel に誤ってデータを書き込むことを防ぎます。

複数の channel と select

先の例では、 worker は 3 つのメッセージを返すことが予め分かっていたので、 3 つだけ取り出していました。
しかし、 worker が予めわからない場合などもあります。
その場合は、用途の違う別の channel を返すことで、必要なメッセージをとることもできます。

Go では、関数が複数の値を返すことができるため、以下の例は worker が終わったことを返すための channel を worker が返すようにしています。


main() では、複数のチャネルからのメッセージを受信する必要がありますが、それぞれブロックしてしまいます。そんな場合は、 select という構文を使うと、複数の channel の受信を同時に行うことができます。

func worker(msg string) (<-chan string, <-chan bool) {
	var wg sync.WaitGroup
	receiver := make(chan string)
	fin := make(chan bool)
	go func() {
		for i := 0; i < 3; i++ {
			wg.Add(1)
			go func(i int) {
				msg := fmt.Sprintf("%d %s done", i, msg)
				receiver <- msg
				wg.Done()
			}(i)
		}
		wg.Wait()
		fin<-false // 終了を伝える
	}()
	return receiver, fin
}

func main() {
	receiver, fin := worker("job")
	for {
		select {
		case receive := <-receiver:
			log.Println(receive)
		case <-fin: // 終了したら終わる
			return
		}
	}
}

https://github.com/Jxck/goroutine-sample/blob/master/finchannel.go

Channel の close()

close() は組み込みの関数で、用の済んだ channel を閉じることができます。
そもそも channel の呼び出しは 2 つの値が受け取れます。

message, ok := <-channel

この 2 つめの ok は、 channel が閉じられているかを表す bool です。
ok は、取っても取らなくても良い仕様になっています。


channel を close() した場合、受信側には 空のメッセージと false が渡るので、これを使って channel が close() されたことを判定できます。

 
先ほどの例を close を使って書き直すと以下になります。

func worker(msg string) (<-chan string) {
	var wg sync.WaitGroup
	receiver := make(chan string)
	go func() {
		for i := 0; i < 3; i++ {
			wg.Add(1)
			go func(i int) {
				msg := fmt.Sprintf("%d %s done", i, msg)
				receiver <- msg
				wg.Done()
			}(i)
		}
		wg.Wait()
		close(receiver)
	}()
	return receiver
}

func main() {
	receiver := worker("job")
	for {
		receive, ok := <-receiver
		if !ok {
			log.Println("closed")
			return
		}
		log.Println(receive)
	}
}

https://github.com/Jxck/goroutine-sample/blob/master/close.go

timeout

重たい worker がいた場合、 worker からの終了通知を受け取らずに、一定時間経過したら終わりたい場合もあります。


そんな時は、 time.After という関数を使うことができます。


time.After の型は以下です。

func After(d Duration) <-chan Time


一定時間経過したらメッセージを送る channel を返すので、この channel を受け取って受信をしておけば、一定時間後に処理をするためのトリガーにできます。


これを用いて 1 秒後に timeout するように書き換えてみます。

func randomTime() time.Duration {
	return time.Duration(rand.Intn(1e3)) * time.Millisecond
}

func worker(msg string) <-chan string {
	receiver := make(chan string)
	for i := 0; i < 300; i++ {
		go func(i int) {
			time.Sleep(randomTime())
			msg := fmt.Sprintf("%d %s done", i, msg)
			receiver <- msg
		}(i)
	}
	return receiver
}

func main() {
	receiver := worker("job")
	for {
		select {
		case receive := <-receiver:
			log.Println(receive)
		case <-time.After(time.Second): // 一定時間経過したら受信
			log.Println("timeout")
			return // 受信時に終われば timeout 処理になる。
		}
	}
}

https://github.com/Jxck/goroutine-sample/blob/master/timeout.go

Buffer

Channel は、 make() 時に buffer を指定することができます。
この buffer の値は、一度に channel に書き込める message の上限値になります。
デフォルトは 0 です。指定すると、 MQ のように扱うことができるイメージです。


送信は buffer が一杯だった場合は送信でブロックします。
これを利用して、例えば worker が同時に起動する数を制限できます。


以下は、 100 個の処理を同時に 5 つだけの goroutine を起動して行うサンプルです。

func worker(msg string) <-chan string {
	limit := make(chan int, 5)
	receiver := make(chan string)
	go func() {
		for i := 0; i < 100; i++ {
			log.Println(runtime.NumGoroutine())
			limit <- 1
			go func(i int) {
				msg := fmt.Sprintf("%d %s done", i, msg)
				receiver <- msg
				<-limit
			}(i)
		}
	}()
	return receiver
}

func main() {
	receiver := worker("job")
	for i := 0; i < 100; i++ {
		log.Println(<-receiver)
	}
}

https://github.com/Jxck/goroutine-sample/blob/master/buffer.go


runtime.NumGoroutine() の結果は必ずしも 5 にならない点は前述の通り。


Pattern 編

よくあるパターンを、ここまでの内容を使って実装してみます。

coroutine

coroutine と言えば Lua でしょう。
スケジュールを記述することで、処理を途中で止めて、そこからリスタートすることができます。


単純な例を Lua で書くとこんな感じです。
f() の中の処理を途中で中断し、そこから再開している例です。

function f()
	coroutine.yield "one"
	coroutine.yield "two"
	coroutine.yield "three"
	return
end

local co = coroutine.wrap (f)

print (co ()) -- one
print (co ()) -- two
print (co ()) -- three

https://github.com/Jxck/goroutine-sample/blob/master/coroutine.lua



Go では channel がブロックすることを利用します。

func f(yield chan string) {
	yield <- "one"
	yield <- "two"
	yield <- "three"
}

func main() {
	co := make(chan string)
	go f(co)
	log.Println(<-co) // one
	log.Println(<-co) // two
	log.Println(<-co) // three
}

https://github.com/Jxck/goroutine-sample/blob/master/coroutine.go

generator

generator は、多くの場合配列のように扱えますが、扱う値が実行時に生成されている点が配列と違います。予め値を生成しないことにより、メモリ消費が少ないのが特徴です。


10 まで値を取り出せる generator は、 python だと以下のようになります。

def generator(n):
	i = 0
	while True:
		if i > n: break
		yield i
		i += 1

for i in generator(10):
	print i

https://github.com/Jxck/goroutine-sample/blob/master/generator.py


Go ではやはり、 channel のブロックを使います。

func generator(n int) chan int {
	ch := make(chan int)
	i := 0
	go func() {
		for {
			ch <- i
			i++
			if i > n {
				close(ch)
				break
			}
		}
	}()
	return ch
}

func main() {
	for x := range generator(10) {
		log.Println(x)
	}
}

https://github.com/Jxck/goroutine-sample/blob/master/generator.go


make() の部分で buffer を指定していないので、同時に 1 つしか生成されないことが保証できます。(ということは生成する数を指定することもできる)

その他

multi-core

現時点では、 goroutine がマルチコアを自動的に使いこなすような最適化はされないようです。


もし、 goroutine を複数のコアで実行したい場合は GOMAXPROC 環境変数か、 runtime.GOMAXPROCS() にコア数を指定します。


よって、ソースに以下のように書くことが多いです。

cpus := runtime.NumCPU()
runtime.GOMAXPROCS(cpus)
ベンチ

goroutine のメモリ使用量を調べます。
こちらにあったのをお借りして、少し変えてみました。
https://gist.github.com/jgrahamc/5253020

func main() {
	cpus := runtime.NumCPU()
	runtime.GOMAXPROCS(cpus)
	count := 1000 * 100

	var startMemory runtime.MemStats
	runtime.ReadMemStats(&startMemory)

	start := time.Now()
	fin := make(chan bool)
	for i := 0; i < count; i++ {
		go func() {
			<-fin
		}()
	}
	elapsed := time.Since(start)

	var endMemory runtime.MemStats
	runtime.ReadMemStats(&endMemory)
	close(fin)

	fmt.Printf(`
goroutine:	%d
cpu:				%d
time:				%f
memory all: %f MB
memory:			%f byte/goroutine
`,
		count, cpus, elapsed.Seconds(),
		float64(endMemory.Alloc-startMemory.Alloc)/float64(1024*1024),
		float64(endMemory.Alloc - startMemory.Alloc)/float64(count))
}

https://github.com/Jxck/goroutine-sample/blob/master/goroutine-bench.go


100,000 個の goroutine を起動して、その時間とメモリの使用量を見ています。

手元の Mac OSX Lione Core 2 Duo, Memory 4G で実行してみた結果が以下です。

goroutine:	100000
cpu:				2
time:				0.589367
memory all:	23.001915 MB
memory:			241.192560 byte/goroutine


かなり、小さい事がわかりますね。
というか不安なのでもう少し調べてみます(汗)

outro

goroutine と channel 自体の仕様はそんなに難しいものではありませんが、 2 つをを使って、かなり色々表現できることが分かって頂けたと思います。また、 select や closure といった機能が地味に協力なので、組み合わせるとさらに色々できるようになります。


と、ここまでが発表の範囲でしたがまだいくつかあるので、後で追記していきます。


go の売りの 1 つでもある、この組み込みの並行処理機能の使い方がなんとなくでも伝わればと思います。