Block Rockin’ Codes

back with another one of those block rockin' codes

Advanced Go Concurrency 3 つのパターン

intro

ちょっと時間が経ってしまいましたが、 Go研 vol.03 では、 Google I/O 2013 で行われてた Go のセッションの 1 つである下記をテーマに研究しました。


Advanced Go Concurrency Patterns


資料は以下です。


https://github.com/goken/goken/blob/master/goken03/goken03.md


また、ここから順に実装しながら解説をしますが、その完成品はこちらにあります。
(commit 履歴も、本記事にある程度沿っています。)


https://gist.github.com/Jxck/5831269


スライドにそってやったのですが、セッションの内容は結構重ためだったので、 2 時間の Go 研だとちょっと消化不良ぎみだったのが反省点です。


そこで、このセッションの要である、並行処理に関する 3 つのパターンについて紹介します。

メインテーマ

このセッションにおいて一番重要だったのは、 Select の使い方です。
セッションでは、 RSS Reader をテーマとしていますが、その内容をデフォルメして、以下のような古典的 pooling 処理を考えてみます。


1. loop を回して、定期的に外部サーバに GET を投げる
2. 取得した結果を、 channel を経由して呼び出し側に渡す
3. 呼び出し側から Close を呼ばれたら終了処理をする


Loop の部分は、ナイーブな実装は下記のようになるでしょう。

// 定期的な取得をするループ
func (p *Pooling) Loop() {
	for {
		// Close() が呼ばれいるかを調べる
		if p.closed {
			close(p.result)
			return
		}
		// Get を呼ぶだけ
		body, err := Get(p.url)
		if err != nil {
			// エラーがあったら、 3 秒間停止して再開
			p.err = err
			time.Sleep(3 * time.Second)
			fmt.Println("failed")
			continue
		}
		// 結果を channel で渡す
		p.result <- body
		time.Sleep(1 * time.Second)
	}
}


しかし、この実装にはいくつかの問題があります。

問題点

この loop の実装では以下の問題があります。

1. p.closed, p.err が race condition (複数の Goroutine が lock なしでアクセス)
2. Close() を実行しても Sleep() が長いとすぐには loop が終わらない
3. Close() 後に result の読み出しを止められると、 loop が永遠に終わらない
4. Get() が同期処理でブロックする


これらを解決するためには、基本的な方針として Select を用いたイベント駆動を導入します。 for-loop 内で select を使用する基本的な構造は以下になります。

func (p *Pooling) Loop() {
	// (1) mutable な変数の定義
	for {
		// (2) case のための channel を定義
		select {
		case <-c1: // イベントの補足
			// read/write
		case c2 <- x: // 書き込み
			// read/write
		case y := <-c3: // 値の受け取り
			// read/write
		}
	}
}

この構造は、 Go の並行処理プログラミングでは基本になります。
とくに (1), (2) と書いた領域の使い分けが大事だと学びました。

request/response パターン

Close() を channel を用いた形で実装しなおします。

トリガーにするための適当な channel を作ります。
ここでは closing という bool の channelにしています。

type Pooling struct {
	url     string
	result  chan string
	closing chan bool
	err     error
}

Close() はそこに何か送ればいいです。
といっても実際に何か送りたいデータがある訳ではないので close() してしまいます。
(close() すると ゼロ値の false が送られる)

// loop を終了させる
func (p *Pooling) Close() error {
	close(p.closing)
	return p.err
}

loop 側は case で補足したら終了処理をします。

func (p *Pooling) Loop() {
	for {
		select {
		case <-p.closing:
			close(p.result)
			return

これで p.closed へのアクセスは無くなりました。
しかし、これだと error がとれません。これは loop 側からメッセージで返して欲しいですね。

channel の channel

そこで、トリガとして作った closing を、 channel の channel にします。

type Pooling struct {
	url     string
	result  chan string
	closing chan chan error
}

"error を送る channel" を送ることができる channel です。


Close() は、error を受けとるための channel を作って closing に送ります。

そして、 closing 経由でエラーが返ってくるのを待ちます。読み出してブロックするので、実際に終了処理が終わるまでは Close() は終わりません。また読み出した値をそのまま返せばシグネチャはそのままです。

// loop を終了させる
func (p *Pooling) Close() error {
	done := make(chan error)
	p.closing <- done
	return <-done
}

loop の方では、 Get() でエラーが発生したらそれを保持しておき、 Close() の(p.closing から値を受け取った)タイミングでそのエラーを送信します。
p.closing から受け取っている done は error 型の channel ななので、ここに送信するだけです。
(実際これだと最後の Get() で発生したエラーしか送れない気がするけど、今回の元ネタの方もそうなっていて、説明の都合上この方が都合がいいので細かい事は気にしない方向で)

// 定期的な取得をするループ
func (p *Pooling) Loop() {
	// Get が失敗したらこれに入れる
	var err error
	for {
		select {
		case done := <-p.closing:
			close(p.result)
			done <- err
			return
		default:
			// Get を呼ぶだけ
			body, err := Get(p.url)

err は Loop() のローカルとして宣言しており、 Goroutine をまたいでいません。
別の Goroutine への送信は channel 経由のメッセージングを使っているので race condition はありません。
(ちなみに、 Get の結果の代入で body は新しい変数ですが、 err は再宣言になっているように思いました。実際これはエラーになりません。詳細はこちら。 http://qiita.com/Jxck_/items/2616abafea89ee97c477)

こうして、 p.closed, p.err で起こっていた race condition は解消されました。

interval channel

Close を channel 経由で実行できるようにしたため、他のも channel でイベントとして定義するようにすれば、処理を多重化することができて、この問題も解決します。


ここまで time.Sleep を用いて行ってきたインターバル処理については、 time.Tick() を用いるこでイベントにできます。 time.Tick(1 * time.Second) は想像通り、一秒毎に値を送ってくる channel を返します。

for {
	interval := time.Tick(1 * time.Second) // インターバル
	select {
	case done := <-p.closing:
		// snip
	case <-interval:
		// Get を呼ぶだけ
		body, err := Get(p.url) // err の方は単なる代入

標準パッケージの色々な関数は、こうした仕様を想定して channel を返すものが結構あります。
特に time.After(), time.Tick は特定のタイミングになったことを channel で伝えるようになっているため、この使い方にフィットします。

バッファリング

現在の実装では、 loop の中で p.result に取得した値を書き込んでいるためここでブロックしてしまいます。
呼出側がこれを読み込まない限りは、次の GET は発生しませんし、 Close() することもできません。


そこで、 GET で取得した結果はバッファに貯めて、クライアントへの通知とは別にしてみたいと思います。ここでバッファには単純なスライスを使います。

// 定期的な取得をするループ
func (p *Pooling) Loop() {
	var buffer []string
	for {
		// snip
		case <-interval:
			// Get を呼ぶだけ
			body, err := Get(p.url) // err の方は単なる代入
			// buffer に結果を追加
			buffer = append(buffer, body)
		case p.result <- buffer[0]:
			buffer = buffer[1:]
		}
	}
}

buffer からの取り出しは channel への書き込みになっています。
こうすると、クライアントが読み出すタイミングで順番に送ることができます。


しかし、この単純な実装だとバッファが空のタイミングで読み出されると、 index out of range で panic になります。
panic を rescue することも出きると思いますが、ここでは nil channel パターンを用いて解決してみます。

nil channel パターン

nil channel とは単純に channel の変数が nil になっている状態です。
この状態で select の case で read/write すると、ブロックされます。 case に channel を置く場合は read/write できたタイミングでヒットするため、ブロックしたままヒットしないということはその case が実質無視されるようなイメージになります(panic にはならないのがポイントです)
つまり、 select の前に channel をセットアップする段階で、実行したくない case があったらそこを nil にすればよく、実行したい場合は適切な channel で初期化しておけばよいです。


今回の場合は、スライスの中身で分岐したいので以下のようになります。

for {
	var first string
	var result chan string
	if len(buffer) > 0 {
		first = buffer[0]
		result = p.result // ここを通らないと nil
	}
	select {
	case <-interval:
		body, err := Get(p.url) // err の方は単なる代入
		// buffer に結果を追加
		buffer = append(buffer, body)
	case result <- first: // result が nil だと実行されない
		buffer = buffer[1:]
	}
}

もし buffer が空だと、 result が nil なので result <- first の部分がブロックし、panic にはなりません。

buffer 肥大対応

buffer スライスを導入したために、クライアントの読み出しが遅かった場合は buffer に値がたまりつづけてメモリを食い尽くすリスクが出ます。

そこで、 buffer の最大値を決めておき、それを越えた場合は GET しないように変更します。
これも nil channel パターンをつかうことで解決できます。

const maxBuffer = 10
for {
	var first string
	var result chan string
	if len(buffer) > 0 {
		first = buffer[0]
		result = p.result // ここを通らないと nil
	}
	var interval <-chan time.Time
	if len(buffer) < maxBuffer {
		interval = time.After(1 * time.Second) // インターバル
	}
	select {
	case <-interval:
		// Get を呼ぶだけ
		body, err := Get(p.url) // err の方は単なる代入
		// snip
		buffer = append(buffer, body)
	case result <- first: // result が nil だと実行されない
		buffer = buffer[1:]
	}
}

buffer のサイズが maxBuffer を越えない場合は、 interval が nil のまま設定されないため、 Get() を呼び出す部分の case が実行されません。

Get の非同期化

Get() は同期ネットワーク I/O を伴います。
取得に時間がかかるとそこでブロックしてしまうため、これを非同期にすることでブロックしないようにすることが望ましいです。

そこで、 Get を goroutine 内で実行し channel 経由で結果を返すようにします。
返した結果は別の select で受信します。受信自体は一度に 1 つにするため channel には buffer を 1 と指定し、ここまでと同様 nil channel パターンを使います。


まず Get の結果を詰め込む構造体を定義します。

type getResponse struct {
	body string
	err  error
}

次に、それを渡す chennel です。この channel が使われていない時だけ interval を回し、 Get を goroutine で実行します。
取得結果の処理は別の case になります。

// 定期的な取得をするループ
func (p *Pooling) Loop() {
	var response chan getResponse
	for {
		var interval <-chan time.Time
		if response == nil && len(buffer) < maxBuffer {
			interval = time.After(1 * time.Second) // インターバル
		}
		select {
		case <-interval:
			response = make(chan getResponse, 1)
			go func() {
				body, err := Get(p.url) // err の方は単なる代入
				response <- getResponse{body, err}
			}()
		case res := <-response:
			response = nil // 次のループのために nil に戻す
			// buffer に結果を追加
			buffer = append(buffer, res.body)
		case result <- first: // result が nil だと実行されない
			buffer = buffer[1:]
		}
	}
}

こうして、最初にあげた問題はすべて解決しました。
nil channel によってループの中での実行タイミングを制御していることがわかります。

まとめ

Google I/O 2012 の Go Concurrency Patterns by rob pike では、 goroutine や channel の強力さなどが重点的に説明されていましたが、今回の発表では Select-Loop の強力さにフォーカスしており、非常に実践的な内容でした。


今回できたようなパターンを使うと、処理を非同期にし、メッセージングによってレースコンディションを無くした並行処理が実装ができるようになります。


Select で channel を read/write するのは、結局は イベントループとイベントドリブンなプログラミングパラダイムを自分で実装しているようなイメージで、 Node.js なんかでやっているのと基本的には同じです。
今後は Node.js でのイベントドリブンのノウハウ(Stream とか)も踏まえて、 Goroutine をより上手く扱えるように研究を重ねたいと思います。