Block Rockin’ Codes

back with another one of those block rockin' codes

Go の並行処理

intro

先日の Go のカンファレンス GoCon で、 Go の並行処理周りについて発表させて頂きました。


Go Conference 2013 spring - connpass


具体的には Goroutine や Channel の話ですが、これらの機能は結構面白くて、いじって遊んでるだけでもわくわくします。
Go の並行処理は、設計方針がわりと特殊だと思うのですが、設計がシンプルなので分かるとそこまで難しくはないです。
(使いこなすのは、経験が必要そうですが)


今回話すにあたって色々調べましたが、発表時間の都合上省いたものもあるし、質疑応答で聞かれて応えられなかったこともあるので、
ここでまとめて置こうと思います。

発表資料

今回の発表資料はこちらです。
このブログの内容は、これをベースにします。


http://jxck.node-ninja.com/slides/gocon-2013spring.html


ソースは、ここにまとめました。


https://github.com/Jxck/goroutine-sample

Go の Concurrency Model

並行処理プログラミングには、ざっくり分けて二つのアプローチがあります。

Shared-memory communication
ワーカ間でメモリ(リソース) を共有する。レースコンディションが起こらないようにロックをとることが多く、その実装は難しいことが多いとされる。
Message-passing communication
ワーカ間でメッセージパッシングを行う。Erlang などに実装された Actor モデルなどが代表的な実装。


そして、Go の並行処理モデルの方針は下記に宣言されています。

"Do not communicate by sharing memory; instead, share memory by communicating"


なので、上記で言うと後者なのですが、 Erlang の Actor モデルなどとは異なる点がいくつかあります。


Go での実装には、下記の二つが参考にされています。

CSP (Communicating Sequential Processes)
並行処理のための設計理論。実装としては Occam, Limbo などがある。
π caluculus
こちらも並行処理のための理論。Erlang のメッセージングが ! なのはこれが元になってるよう。日本語の情報ならここA Very Brief Introduction to the Pi-Calculus (in Japanese)


これらを実現するために、 Go には以下の機能が実装されています。

  • goroutine
  • channel


また、以下の二つもそれをより強力にします。

  • select
  • closure

Goroutine

Goroutine とは
  • Coroutine ではない。
  • Thread, Process でもない。
  • 複数の Thread 上に多重化されて実装されてる。
  • main() 自身や Scavenger(GC) などランタイムも goroutine を使ってる。


上記周りの話は、自分のあとの methane さんの発表の方が詳しかったので、そちらを参照して下さい。
https://gist.github.com/methane/5377227#file-goscheduler-md

go 文での起動

goroutine は go 文で関数を実行すると起動できます。
go 文はブロックしないので、 goroutine は非同期に起動されます。
下記の main() では、goroutine を二つ起動してから time.Sleep() で一時停止しているのは、これが無かったら f() が実行される前に main() が終わってしまうからです。


goroutine が他に実行されていようと main() が終わるとプロセスが終了します。

func f(msg string) {
	log.Println(msg)
}

func main() {
	go f("hello")
	go func() {
		log.Println("gopher")
	}()
	time.Sleep(time.Second)
}

https://github.com/Jxck/goroutine-sample/blob/master/goroutine.go

goroutine の終了条件

goroutine は、下記の条件で終了します。

  • 関数が終わる
  • return で抜ける
  • runtime.Goexit() を実行する
func main() {
	go func() {
		log.Println("end")
	}()
	go func() {
		log.Println("return")
		return
	}()
	go func() {
		log.Println("exit")
		runtime.Goexit()
	}()
	time.Sleep(time.Second)
}

https://github.com/Jxck/goroutine-sample/blob/master/goroutine-exit.go


runtime.NumGoroutine

現在起動している goroutine の数を知ることができます。

func main() {
	log.Println(runtime.NumGoroutine())
}

これを実行すると、自分の環境では 2 と出ました。


まず、 main() そのものが goroutine なので、 1 つはわかります。
しかし、もう 1 つは誰でしょう? 二つの関数を使ってますが、いずれも goroutine は使ってません。


ML で聞いてみたところ、 bradfitz さんが応えてくれました。こんな大物の返信がもらえるとか、、


結果から言うと、下記に変えると分かります。

func main() {
	log.Println(runtime.NumGoroutine())
	select{}
}

これを GOTRACEBACK=2 をつけて実行します。

GOTRACEBACK=2 go run numgoroutine.go

出力が長いのでのせませんが、見ると runtiem.main() 以外に runtime.MHeap_Scavenger() が動いてる事がわかります。


このように、 go 文で起動した goroutine や main() 以外にも、 Scavenger (GC) などランタイム環境でも goroutine が使われていることがわかります。


この情報は、 runtime.Stack からも取れます。

func main() {
	log.Println(runtime.NumGoroutine())
	buf := make([]byte, 1<<20)
	buf = buf[:runtime.Stack(buf, true)]
	log.Println(string(buf))
	select {}
}

https://github.com/Jxck/goroutine-sample/blob/master/who.go

Channel

Channel とは
  • Channel は goroutine 間でのメッセージパッシングをするためのもの
  • メッセージの型を指定できる
  • first class value であり、引数や戻り値にも使える
  • send/receive でブロックする
  • buffer で、一度に扱えるメッセージ量を指定できる
Channel を用いたメッセージング

Channel は参照型なので make() でインスタンスを生成します。
使い方は簡単で、送信が "channel<-value" で受信が "<-channel" です。


下記は、 goroutine と main() 間でのメッセージングです。
main() の最後では受信結果を出力しており、その受信で値が届くまでブロックするので、 time.Sleep() は必要無い点に注意して下さい。

func f(ch chan bool) {
	ch <- true
}

func main() {
	ch := make(chan bool)
	go f(ch)
	log.Println(<-ch) // ここでデータが来るまでブロックする
}

https://github.com/Jxck/goroutine-sample/blob/master/channel.go

同期

channel は、送受信が完了するまでブロックします。
このことを、 goroutine 間の同期に応用することができます。


下記は、 main() が channel を受信する事で goroutine の終了を待つ同期をしています。
受信した値自体は必要ないため、捨てています。 goroutine は channel を closure で参照しています。closure があると本当に便利ですね。

func main() {
	fin := make(chan bool)
	go func() {
		log.Println("worker working..")
		fin <- false
	}()
	<-fin
}

https://github.com/Jxck/goroutine-sample/blob/master/finchannel.go


上記は、 goroutine が 1 つですが、複数ある場合は数を管理しないといけなくなります。
それを行う場合は sync.WaitGroup というモジュールが使用できます。

func main() {
	var wg sync.WaitGroup
	for i:=0; i<3; i++ {
		wg.Add(1) // goroutine を生成するたびインクリメント
		go func(i int) {
			log.Println(i)
			wg.Done() // 終了時にデクリメント
		}(i)
	}
	wg.Wait() // ブロックし、全ての Done が終わったら次に進む
}

https://github.com/Jxck/goroutine-sample/blob/master/waitGroup.go


sync.WaitGroup は channel を使っているわけではないようです。
sync パッケージには、 lock をとったりするパッケージがるので、 channel を使わずそれでリソース共有/同期をすることもできますが、最初に述べたように Go では極力メッセージングでリソース共有/同期をしましょう。

Worker の起動

下記は、ワーカを 3 つ起動し、それぞれの処理結果を main() で受け取っています。
worker() を go 文で実行し、結果を渡してもらうための channel を渡すのではなく、 worker が内部で、結果を渡すための channel を生成しそれを返しているので、 main() はそこから取り出しています。

func worker(msg string) <-chan string {
	receiver := make(chan string)
	for i := 0; i < 3; i++ {
		go func(i int) {
			msg := fmt.Sprintf("%d %s done", i, msg)
			receiver <- msg
		}(i)
	}
	return receiver
}

func main() {
	receiver := worker("job")
	for i := 0; i < 3; i++ {
		log.Println(<-receiver)
	}
}

https://github.com/Jxck/goroutine-sample/blob/master/workers.go


worker の宣言に注目して下さい。

// func worker(msg string) chan string こうではない
func worker(msg string) <-chan string

worker が返す型は「読み取り専用の channel」です。(<- がついてる)
これにより、 main() がこの channel に誤ってデータを書き込むことを防ぎます。

複数の channel と select

先の例では、 worker は 3 つのメッセージを返すことが予め分かっていたので、 3 つだけ取り出していました。
しかし、 worker が予めわからない場合などもあります。
その場合は、用途の違う別の channel を返すことで、必要なメッセージをとることもできます。

Go では、関数が複数の値を返すことができるため、以下の例は worker が終わったことを返すための channel を worker が返すようにしています。


main() では、複数のチャネルからのメッセージを受信する必要がありますが、それぞれブロックしてしまいます。そんな場合は、 select という構文を使うと、複数の channel の受信を同時に行うことができます。

func worker(msg string) (<-chan string, <-chan bool) {
	var wg sync.WaitGroup
	receiver := make(chan string)
	fin := make(chan bool)
	go func() {
		for i := 0; i < 3; i++ {
			wg.Add(1)
			go func(i int) {
				msg := fmt.Sprintf("%d %s done", i, msg)
				receiver <- msg
				wg.Done()
			}(i)
		}
		wg.Wait()
		fin<-false // 終了を伝える
	}()
	return receiver, fin
}

func main() {
	receiver, fin := worker("job")
	for {
		select {
		case receive := <-receiver:
			log.Println(receive)
		case <-fin: // 終了したら終わる
			return
		}
	}
}

https://github.com/Jxck/goroutine-sample/blob/master/finchannel.go

Channel の close()

close() は組み込みの関数で、用の済んだ channel を閉じることができます。
そもそも channel の呼び出しは 2 つの値が受け取れます。

message, ok := <-channel

この 2 つめの ok は、 channel が閉じられているかを表す bool です。
ok は、取っても取らなくても良い仕様になっています。


channel を close() した場合、受信側には 空のメッセージと false が渡るので、これを使って channel が close() されたことを判定できます。

 
先ほどの例を close を使って書き直すと以下になります。

func worker(msg string) (<-chan string) {
	var wg sync.WaitGroup
	receiver := make(chan string)
	go func() {
		for i := 0; i < 3; i++ {
			wg.Add(1)
			go func(i int) {
				msg := fmt.Sprintf("%d %s done", i, msg)
				receiver <- msg
				wg.Done()
			}(i)
		}
		wg.Wait()
		close(receiver)
	}()
	return receiver
}

func main() {
	receiver := worker("job")
	for {
		receive, ok := <-receiver
		if !ok {
			log.Println("closed")
			return
		}
		log.Println(receive)
	}
}

https://github.com/Jxck/goroutine-sample/blob/master/close.go

timeout

重たい worker がいた場合、 worker からの終了通知を受け取らずに、一定時間経過したら終わりたい場合もあります。


そんな時は、 time.After という関数を使うことができます。


time.After の型は以下です。

func After(d Duration) <-chan Time


一定時間経過したらメッセージを送る channel を返すので、この channel を受け取って受信をしておけば、一定時間後に処理をするためのトリガーにできます。


これを用いて 1 秒後に timeout するように書き換えてみます。

func randomTime() time.Duration {
	return time.Duration(rand.Intn(1e3)) * time.Millisecond
}

func worker(msg string) <-chan string {
	receiver := make(chan string)
	for i := 0; i < 300; i++ {
		go func(i int) {
			time.Sleep(randomTime())
			msg := fmt.Sprintf("%d %s done", i, msg)
			receiver <- msg
		}(i)
	}
	return receiver
}

func main() {
	receiver := worker("job")
	for {
		select {
		case receive := <-receiver:
			log.Println(receive)
		case <-time.After(time.Second): // 一定時間経過したら受信
			log.Println("timeout")
			return // 受信時に終われば timeout 処理になる。
		}
	}
}

https://github.com/Jxck/goroutine-sample/blob/master/timeout.go

Buffer

Channel は、 make() 時に buffer を指定することができます。
この buffer の値は、一度に channel に書き込める message の上限値になります。
デフォルトは 0 です。指定すると、 MQ のように扱うことができるイメージです。


送信は buffer が一杯だった場合は送信でブロックします。
これを利用して、例えば worker が同時に起動する数を制限できます。


以下は、 100 個の処理を同時に 5 つだけの goroutine を起動して行うサンプルです。

func worker(msg string) <-chan string {
	limit := make(chan int, 5)
	receiver := make(chan string)
	go func() {
		for i := 0; i < 100; i++ {
			log.Println(runtime.NumGoroutine())
			limit <- 1
			go func(i int) {
				msg := fmt.Sprintf("%d %s done", i, msg)
				receiver <- msg
				<-limit
			}(i)
		}
	}()
	return receiver
}

func main() {
	receiver := worker("job")
	for i := 0; i < 100; i++ {
		log.Println(<-receiver)
	}
}

https://github.com/Jxck/goroutine-sample/blob/master/buffer.go


runtime.NumGoroutine() の結果は必ずしも 5 にならない点は前述の通り。


Pattern 編

よくあるパターンを、ここまでの内容を使って実装してみます。

coroutine

coroutine と言えば Lua でしょう。
スケジュールを記述することで、処理を途中で止めて、そこからリスタートすることができます。


単純な例を Lua で書くとこんな感じです。
f() の中の処理を途中で中断し、そこから再開している例です。

function f()
	coroutine.yield "one"
	coroutine.yield "two"
	coroutine.yield "three"
	return
end

local co = coroutine.wrap (f)

print (co ()) -- one
print (co ()) -- two
print (co ()) -- three

https://github.com/Jxck/goroutine-sample/blob/master/coroutine.lua



Go では channel がブロックすることを利用します。

func f(yield chan string) {
	yield <- "one"
	yield <- "two"
	yield <- "three"
}

func main() {
	co := make(chan string)
	go f(co)
	log.Println(<-co) // one
	log.Println(<-co) // two
	log.Println(<-co) // three
}

https://github.com/Jxck/goroutine-sample/blob/master/coroutine.go

generator

generator は、多くの場合配列のように扱えますが、扱う値が実行時に生成されている点が配列と違います。予め値を生成しないことにより、メモリ消費が少ないのが特徴です。


10 まで値を取り出せる generator は、 python だと以下のようになります。

def generator(n):
	i = 0
	while True:
		if i > n: break
		yield i
		i += 1

for i in generator(10):
	print i

https://github.com/Jxck/goroutine-sample/blob/master/generator.py


Go ではやはり、 channel のブロックを使います。

func generator(n int) chan int {
	ch := make(chan int)
	i := 0
	go func() {
		for {
			ch <- i
			i++
			if i > n {
				close(ch)
				break
			}
		}
	}()
	return ch
}

func main() {
	for x := range generator(10) {
		log.Println(x)
	}
}

https://github.com/Jxck/goroutine-sample/blob/master/generator.go


make() の部分で buffer を指定していないので、同時に 1 つしか生成されないことが保証できます。(ということは生成する数を指定することもできる)

その他

multi-core

現時点では、 goroutine がマルチコアを自動的に使いこなすような最適化はされないようです。


もし、 goroutine を複数のコアで実行したい場合は GOMAXPROC 環境変数か、 runtime.GOMAXPROCS() にコア数を指定します。


よって、ソースに以下のように書くことが多いです。

cpus := runtime.NumCPU()
runtime.GOMAXPROCS(cpus)
ベンチ

goroutine のメモリ使用量を調べます。
こちらにあったのをお借りして、少し変えてみました。
https://gist.github.com/jgrahamc/5253020

func main() {
	cpus := runtime.NumCPU()
	runtime.GOMAXPROCS(cpus)
	count := 1000 * 100

	var startMemory runtime.MemStats
	runtime.ReadMemStats(&startMemory)

	start := time.Now()
	fin := make(chan bool)
	for i := 0; i < count; i++ {
		go func() {
			<-fin
		}()
	}
	elapsed := time.Since(start)

	var endMemory runtime.MemStats
	runtime.ReadMemStats(&endMemory)
	close(fin)

	fmt.Printf(`
goroutine:	%d
cpu:				%d
time:				%f
memory all: %f MB
memory:			%f byte/goroutine
`,
		count, cpus, elapsed.Seconds(),
		float64(endMemory.Alloc-startMemory.Alloc)/float64(1024*1024),
		float64(endMemory.Alloc - startMemory.Alloc)/float64(count))
}

https://github.com/Jxck/goroutine-sample/blob/master/goroutine-bench.go


100,000 個の goroutine を起動して、その時間とメモリの使用量を見ています。

手元の Mac OSX Lione Core 2 Duo, Memory 4G で実行してみた結果が以下です。

goroutine:	100000
cpu:				2
time:				0.589367
memory all:	23.001915 MB
memory:			241.192560 byte/goroutine


かなり、小さい事がわかりますね。
というか不安なのでもう少し調べてみます(汗)

outro

goroutine と channel 自体の仕様はそんなに難しいものではありませんが、 2 つをを使って、かなり色々表現できることが分かって頂けたと思います。また、 select や closure といった機能が地味に協力なので、組み合わせるとさらに色々できるようになります。


と、ここまでが発表の範囲でしたがまだいくつかあるので、後で追記していきます。


go の売りの 1 つでもある、この組み込みの並行処理機能の使い方がなんとなくでも伝わればと思います。