Block Rockin’ Codes

back with another one of those block rockin' codes

Golang Error Handling lesson by Rob Pike

Intro

この記事は Go Advent Calendar 2014 の 15 日目の記事です。

例えばネットワークのフレーム処理的なものを書いている場合、以下のようなコードがよくでてきます。

There are many codes like this, while writing a Network Frame Parser program.

var type uint8
err = binary.Read(r, binary.BigEndian, &type)
if err != nil {
    return err
}

var length uint32
err = binary.Read(r, binary.BigEndian, &length)
if err != nil {
    return err
}

...

関数の中では、各要素の長さ毎に読み込んで、読み込みに失敗したらエラーを呼び出し元に返す。

each function reads length of value from socket and return error if failed.


書き込む時は、ほぼ逆のコードをおぼ同じように書きます。

same things happens in writing to socket code.


さすがにこれが何個も続くと DRY ではないし、エラーの処理を忘れても気づきにくいという問題がある。

same code appears again and again, it's not DRY. and also you probably forget handle error which you can't notice easily.

Go のエラーの問題点

Go では多値を返して、その最後がエラーになるという形式が、一般的でありかつ型として定義されている。

It's a basic way in go, returning multiple values from function, and error is last value of then.


ただし、問題は戻り値は「処理しなかったとしてもコンパイルが通る」という仕様になっている。

but even if you don't care about return values, it's not compile error.


つまり上の例は、以下のように書いてもコンパイルが通る。

it means that compiler allows code like this.

// ignore return values
binary.Read(r, binary.BigEndian, &type)
binary.Read(r, binary.BigEndian, &length)

これが結構問題で、取り忘れたことがコンパイラだけでは分からない。

there is no notification when you forgot to process returned value even if it inclueds error.


知りたければ、別途ツールを使って検査するしかない。

if you want to fined uncaught error, you need another linter tool.


しかし、例えば戻り値の取得を必須にするとそれもまた問題で、例えば fmt.Println が多値を返すのでプリントデバッグが大変なことになる。

should compiler force programer to handle returned value ? I don't think so, for example fmt.Println returns error too. so its make difficult doing PRINT DEBUG.

_, _ := fmt.Println("print debug")

_, _ := fmt.Println("here")

_, _ := fmt.Println("")

How to solve?

特に Read や Write での処理が増えて行った場合、コードをすっきりさせつつ、 Error を確実に処理する方法について、自分の中では色々と試行錯誤していた。

I tried some practice for make lots of Read/Write proces simple, and make sure process all Errors.


で、先日 http://gocon.connpass.com/event/9748/ のために来日して下さった、 Rob Pike 先生に、この件を聞いてみた。

And I had a chance to ask Mr.Rob Pike about this problem at after party of http://gocon.connpass.com/event/9748/ in Japan last month.


そして Rob 先生は、俺のキーボード(悲しいことに vim しか入ってなかった。。)で、実際に書きながら説明してくれました!!なんという幸運。

and then, Mr.Rob taught me with writting a code on my Mac(unfortunately, I installed only vim...), what's a presious happenig !!

先生が説明しつつ書いてくれたコードがこちら。

Mr.Rob show me the code like this.

type errWriter struct {
    w io.Writer
    err error
}

func (e *errWriter) Write(p []byte) {
    if e.err != nil {
        return
    }
    _, e.err = e.w.Write(p)
}

func (e *errWriter) Err() error {
    return e.err
}

func do() {
    ew := &errWriter{
        w: ...
    }
    ew.Write(buf)
    ew.Write(buf)
    ...
    ...
    if ew.Err() != nil {
        return ew.Err()
    }
    return nil
}

Writer の Wrapper になっていて、エラーが発生した時は内部でそれを保持しつつ、以降の処理は全てパスされる。

Write a wrapper struct of Writer. while processing, hold a error if happened and pass all Write() below.


最後にエラーの有無を確認することで、エラー処理を一カ所にまとめる。

end of the function, check and process the error in struct. this gathers error handling in one place.


とりあえず Writer と Reader について作っておけば、 Write や Read の処理が多くなるほどメリットが大きくなる。

prepare a struct for Writer and Reader has a merit when you write a lot of Write()/Read() process.


素朴だけど Go らしいコードですね。

so simple but powerful as golang way :)

Movie

すっごく見づらいけど、ぶっちゃけ後で人に見せる映像取るよりも、その場でロブ先生の話聞く方が大事だしそれで一杯一杯だったので、まあ雰囲気だけみてください。

Sorry for hard to watch, but it's more important to see and hear Mr Robs Exmplain haha. be patient :)

Mr. Rob Rike taught me about practice of error handling in Go at GoCon 2014 from Jxck on Vimeo.

Special Thanks

thanks Mr.Rob Pike !

from your student Jxck :)

Go の並行処理

intro

先日の Go のカンファレンス GoCon で、 Go の並行処理周りについて発表させて頂きました。


Go Conference 2013 spring - connpass


具体的には Goroutine や Channel の話ですが、これらの機能は結構面白くて、いじって遊んでるだけでもわくわくします。
Go の並行処理は、設計方針がわりと特殊だと思うのですが、設計がシンプルなので分かるとそこまで難しくはないです。
(使いこなすのは、経験が必要そうですが)


今回話すにあたって色々調べましたが、発表時間の都合上省いたものもあるし、質疑応答で聞かれて応えられなかったこともあるので、
ここでまとめて置こうと思います。

発表資料

今回の発表資料はこちらです。
このブログの内容は、これをベースにします。


http://jxck.node-ninja.com/slides/gocon-2013spring.html


ソースは、ここにまとめました。


https://github.com/Jxck/goroutine-sample

Go の Concurrency Model

並行処理プログラミングには、ざっくり分けて二つのアプローチがあります。

Shared-memory communication
ワーカ間でメモリ(リソース) を共有する。レースコンディションが起こらないようにロックをとることが多く、その実装は難しいことが多いとされる。
Message-passing communication
ワーカ間でメッセージパッシングを行う。Erlang などに実装された Actor モデルなどが代表的な実装。


そして、Go の並行処理モデルの方針は下記に宣言されています。

"Do not communicate by sharing memory; instead, share memory by communicating"


なので、上記で言うと後者なのですが、 Erlang の Actor モデルなどとは異なる点がいくつかあります。


Go での実装には、下記の二つが参考にされています。

CSP (Communicating Sequential Processes)
並行処理のための設計理論。実装としては Occam, Limbo などがある。
π caluculus
こちらも並行処理のための理論。Erlang のメッセージングが ! なのはこれが元になってるよう。日本語の情報ならここA Very Brief Introduction to the Pi-Calculus (in Japanese)


これらを実現するために、 Go には以下の機能が実装されています。

  • goroutine
  • channel


また、以下の二つもそれをより強力にします。

  • select
  • closure

Goroutine

Goroutine とは
  • Coroutine ではない。
  • Thread, Process でもない。
  • 複数の Thread 上に多重化されて実装されてる。
  • main() 自身や Scavenger(GC) などランタイムも goroutine を使ってる。


上記周りの話は、自分のあとの methane さんの発表の方が詳しかったので、そちらを参照して下さい。
https://gist.github.com/methane/5377227#file-goscheduler-md

go 文での起動

goroutine は go 文で関数を実行すると起動できます。
go 文はブロックしないので、 goroutine は非同期に起動されます。
下記の main() では、goroutine を二つ起動してから time.Sleep() で一時停止しているのは、これが無かったら f() が実行される前に main() が終わってしまうからです。


goroutine が他に実行されていようと main() が終わるとプロセスが終了します。

func f(msg string) {
	log.Println(msg)
}

func main() {
	go f("hello")
	go func() {
		log.Println("gopher")
	}()
	time.Sleep(time.Second)
}

https://github.com/Jxck/goroutine-sample/blob/master/goroutine.go

goroutine の終了条件

goroutine は、下記の条件で終了します。

  • 関数が終わる
  • return で抜ける
  • runtime.Goexit() を実行する
func main() {
	go func() {
		log.Println("end")
	}()
	go func() {
		log.Println("return")
		return
	}()
	go func() {
		log.Println("exit")
		runtime.Goexit()
	}()
	time.Sleep(time.Second)
}

https://github.com/Jxck/goroutine-sample/blob/master/goroutine-exit.go


runtime.NumGoroutine

現在起動している goroutine の数を知ることができます。

func main() {
	log.Println(runtime.NumGoroutine())
}

これを実行すると、自分の環境では 2 と出ました。


まず、 main() そのものが goroutine なので、 1 つはわかります。
しかし、もう 1 つは誰でしょう? 二つの関数を使ってますが、いずれも goroutine は使ってません。


ML で聞いてみたところ、 bradfitz さんが応えてくれました。こんな大物の返信がもらえるとか、、


結果から言うと、下記に変えると分かります。

func main() {
	log.Println(runtime.NumGoroutine())
	select{}
}

これを GOTRACEBACK=2 をつけて実行します。

GOTRACEBACK=2 go run numgoroutine.go

出力が長いのでのせませんが、見ると runtiem.main() 以外に runtime.MHeap_Scavenger() が動いてる事がわかります。


このように、 go 文で起動した goroutine や main() 以外にも、 Scavenger (GC) などランタイム環境でも goroutine が使われていることがわかります。


この情報は、 runtime.Stack からも取れます。

func main() {
	log.Println(runtime.NumGoroutine())
	buf := make([]byte, 1<<20)
	buf = buf[:runtime.Stack(buf, true)]
	log.Println(string(buf))
	select {}
}

https://github.com/Jxck/goroutine-sample/blob/master/who.go

Channel

Channel とは
  • Channel は goroutine 間でのメッセージパッシングをするためのもの
  • メッセージの型を指定できる
  • first class value であり、引数や戻り値にも使える
  • send/receive でブロックする
  • buffer で、一度に扱えるメッセージ量を指定できる
Channel を用いたメッセージング

Channel は参照型なので make() でインスタンスを生成します。
使い方は簡単で、送信が "channel<-value" で受信が "<-channel" です。


下記は、 goroutine と main() 間でのメッセージングです。
main() の最後では受信結果を出力しており、その受信で値が届くまでブロックするので、 time.Sleep() は必要無い点に注意して下さい。

func f(ch chan bool) {
	ch <- true
}

func main() {
	ch := make(chan bool)
	go f(ch)
	log.Println(<-ch) // ここでデータが来るまでブロックする
}

https://github.com/Jxck/goroutine-sample/blob/master/channel.go

同期

channel は、送受信が完了するまでブロックします。
このことを、 goroutine 間の同期に応用することができます。


下記は、 main() が channel を受信する事で goroutine の終了を待つ同期をしています。
受信した値自体は必要ないため、捨てています。 goroutine は channel を closure で参照しています。closure があると本当に便利ですね。

func main() {
	fin := make(chan bool)
	go func() {
		log.Println("worker working..")
		fin <- false
	}()
	<-fin
}

https://github.com/Jxck/goroutine-sample/blob/master/finchannel.go


上記は、 goroutine が 1 つですが、複数ある場合は数を管理しないといけなくなります。
それを行う場合は sync.WaitGroup というモジュールが使用できます。

func main() {
	var wg sync.WaitGroup
	for i:=0; i<3; i++ {
		wg.Add(1) // goroutine を生成するたびインクリメント
		go func(i int) {
			log.Println(i)
			wg.Done() // 終了時にデクリメント
		}(i)
	}
	wg.Wait() // ブロックし、全ての Done が終わったら次に進む
}

https://github.com/Jxck/goroutine-sample/blob/master/waitGroup.go


sync.WaitGroup は channel を使っているわけではないようです。
sync パッケージには、 lock をとったりするパッケージがるので、 channel を使わずそれでリソース共有/同期をすることもできますが、最初に述べたように Go では極力メッセージングでリソース共有/同期をしましょう。

Worker の起動

下記は、ワーカを 3 つ起動し、それぞれの処理結果を main() で受け取っています。
worker() を go 文で実行し、結果を渡してもらうための channel を渡すのではなく、 worker が内部で、結果を渡すための channel を生成しそれを返しているので、 main() はそこから取り出しています。

func worker(msg string) <-chan string {
	receiver := make(chan string)
	for i := 0; i < 3; i++ {
		go func(i int) {
			msg := fmt.Sprintf("%d %s done", i, msg)
			receiver <- msg
		}(i)
	}
	return receiver
}

func main() {
	receiver := worker("job")
	for i := 0; i < 3; i++ {
		log.Println(<-receiver)
	}
}

https://github.com/Jxck/goroutine-sample/blob/master/workers.go


worker の宣言に注目して下さい。

// func worker(msg string) chan string こうではない
func worker(msg string) <-chan string

worker が返す型は「読み取り専用の channel」です。(<- がついてる)
これにより、 main() がこの channel に誤ってデータを書き込むことを防ぎます。

複数の channel と select

先の例では、 worker は 3 つのメッセージを返すことが予め分かっていたので、 3 つだけ取り出していました。
しかし、 worker が予めわからない場合などもあります。
その場合は、用途の違う別の channel を返すことで、必要なメッセージをとることもできます。

Go では、関数が複数の値を返すことができるため、以下の例は worker が終わったことを返すための channel を worker が返すようにしています。


main() では、複数のチャネルからのメッセージを受信する必要がありますが、それぞれブロックしてしまいます。そんな場合は、 select という構文を使うと、複数の channel の受信を同時に行うことができます。

func worker(msg string) (<-chan string, <-chan bool) {
	var wg sync.WaitGroup
	receiver := make(chan string)
	fin := make(chan bool)
	go func() {
		for i := 0; i < 3; i++ {
			wg.Add(1)
			go func(i int) {
				msg := fmt.Sprintf("%d %s done", i, msg)
				receiver <- msg
				wg.Done()
			}(i)
		}
		wg.Wait()
		fin<-false // 終了を伝える
	}()
	return receiver, fin
}

func main() {
	receiver, fin := worker("job")
	for {
		select {
		case receive := <-receiver:
			log.Println(receive)
		case <-fin: // 終了したら終わる
			return
		}
	}
}

https://github.com/Jxck/goroutine-sample/blob/master/finchannel.go

Channel の close()

close() は組み込みの関数で、用の済んだ channel を閉じることができます。
そもそも channel の呼び出しは 2 つの値が受け取れます。

message, ok := <-channel

この 2 つめの ok は、 channel が閉じられているかを表す bool です。
ok は、取っても取らなくても良い仕様になっています。


channel を close() した場合、受信側には 空のメッセージと false が渡るので、これを使って channel が close() されたことを判定できます。

 
先ほどの例を close を使って書き直すと以下になります。

func worker(msg string) (<-chan string) {
	var wg sync.WaitGroup
	receiver := make(chan string)
	go func() {
		for i := 0; i < 3; i++ {
			wg.Add(1)
			go func(i int) {
				msg := fmt.Sprintf("%d %s done", i, msg)
				receiver <- msg
				wg.Done()
			}(i)
		}
		wg.Wait()
		close(receiver)
	}()
	return receiver
}

func main() {
	receiver := worker("job")
	for {
		receive, ok := <-receiver
		if !ok {
			log.Println("closed")
			return
		}
		log.Println(receive)
	}
}

https://github.com/Jxck/goroutine-sample/blob/master/close.go

timeout

重たい worker がいた場合、 worker からの終了通知を受け取らずに、一定時間経過したら終わりたい場合もあります。


そんな時は、 time.After という関数を使うことができます。


time.After の型は以下です。

func After(d Duration) <-chan Time


一定時間経過したらメッセージを送る channel を返すので、この channel を受け取って受信をしておけば、一定時間後に処理をするためのトリガーにできます。


これを用いて 1 秒後に timeout するように書き換えてみます。

func randomTime() time.Duration {
	return time.Duration(rand.Intn(1e3)) * time.Millisecond
}

func worker(msg string) <-chan string {
	receiver := make(chan string)
	for i := 0; i < 300; i++ {
		go func(i int) {
			time.Sleep(randomTime())
			msg := fmt.Sprintf("%d %s done", i, msg)
			receiver <- msg
		}(i)
	}
	return receiver
}

func main() {
	receiver := worker("job")
	for {
		select {
		case receive := <-receiver:
			log.Println(receive)
		case <-time.After(time.Second): // 一定時間経過したら受信
			log.Println("timeout")
			return // 受信時に終われば timeout 処理になる。
		}
	}
}

https://github.com/Jxck/goroutine-sample/blob/master/timeout.go

Buffer

Channel は、 make() 時に buffer を指定することができます。
この buffer の値は、一度に channel に書き込める message の上限値になります。
デフォルトは 0 です。指定すると、 MQ のように扱うことができるイメージです。


送信は buffer が一杯だった場合は送信でブロックします。
これを利用して、例えば worker が同時に起動する数を制限できます。


以下は、 100 個の処理を同時に 5 つだけの goroutine を起動して行うサンプルです。

func worker(msg string) <-chan string {
	limit := make(chan int, 5)
	receiver := make(chan string)
	go func() {
		for i := 0; i < 100; i++ {
			log.Println(runtime.NumGoroutine())
			limit <- 1
			go func(i int) {
				msg := fmt.Sprintf("%d %s done", i, msg)
				receiver <- msg
				<-limit
			}(i)
		}
	}()
	return receiver
}

func main() {
	receiver := worker("job")
	for i := 0; i < 100; i++ {
		log.Println(<-receiver)
	}
}

https://github.com/Jxck/goroutine-sample/blob/master/buffer.go


runtime.NumGoroutine() の結果は必ずしも 5 にならない点は前述の通り。


Pattern 編

よくあるパターンを、ここまでの内容を使って実装してみます。

coroutine

coroutine と言えば Lua でしょう。
スケジュールを記述することで、処理を途中で止めて、そこからリスタートすることができます。


単純な例を Lua で書くとこんな感じです。
f() の中の処理を途中で中断し、そこから再開している例です。

function f()
	coroutine.yield "one"
	coroutine.yield "two"
	coroutine.yield "three"
	return
end

local co = coroutine.wrap (f)

print (co ()) -- one
print (co ()) -- two
print (co ()) -- three

https://github.com/Jxck/goroutine-sample/blob/master/coroutine.lua



Go では channel がブロックすることを利用します。

func f(yield chan string) {
	yield <- "one"
	yield <- "two"
	yield <- "three"
}

func main() {
	co := make(chan string)
	go f(co)
	log.Println(<-co) // one
	log.Println(<-co) // two
	log.Println(<-co) // three
}

https://github.com/Jxck/goroutine-sample/blob/master/coroutine.go

generator

generator は、多くの場合配列のように扱えますが、扱う値が実行時に生成されている点が配列と違います。予め値を生成しないことにより、メモリ消費が少ないのが特徴です。


10 まで値を取り出せる generator は、 python だと以下のようになります。

def generator(n):
	i = 0
	while True:
		if i > n: break
		yield i
		i += 1

for i in generator(10):
	print i

https://github.com/Jxck/goroutine-sample/blob/master/generator.py


Go ではやはり、 channel のブロックを使います。

func generator(n int) chan int {
	ch := make(chan int)
	i := 0
	go func() {
		for {
			ch <- i
			i++
			if i > n {
				close(ch)
				break
			}
		}
	}()
	return ch
}

func main() {
	for x := range generator(10) {
		log.Println(x)
	}
}

https://github.com/Jxck/goroutine-sample/blob/master/generator.go


make() の部分で buffer を指定していないので、同時に 1 つしか生成されないことが保証できます。(ということは生成する数を指定することもできる)

その他

multi-core

現時点では、 goroutine がマルチコアを自動的に使いこなすような最適化はされないようです。


もし、 goroutine を複数のコアで実行したい場合は GOMAXPROC 環境変数か、 runtime.GOMAXPROCS() にコア数を指定します。


よって、ソースに以下のように書くことが多いです。

cpus := runtime.NumCPU()
runtime.GOMAXPROCS(cpus)
ベンチ

goroutine のメモリ使用量を調べます。
こちらにあったのをお借りして、少し変えてみました。
https://gist.github.com/jgrahamc/5253020

func main() {
	cpus := runtime.NumCPU()
	runtime.GOMAXPROCS(cpus)
	count := 1000 * 100

	var startMemory runtime.MemStats
	runtime.ReadMemStats(&startMemory)

	start := time.Now()
	fin := make(chan bool)
	for i := 0; i < count; i++ {
		go func() {
			<-fin
		}()
	}
	elapsed := time.Since(start)

	var endMemory runtime.MemStats
	runtime.ReadMemStats(&endMemory)
	close(fin)

	fmt.Printf(`
goroutine:	%d
cpu:				%d
time:				%f
memory all: %f MB
memory:			%f byte/goroutine
`,
		count, cpus, elapsed.Seconds(),
		float64(endMemory.Alloc-startMemory.Alloc)/float64(1024*1024),
		float64(endMemory.Alloc - startMemory.Alloc)/float64(count))
}

https://github.com/Jxck/goroutine-sample/blob/master/goroutine-bench.go


100,000 個の goroutine を起動して、その時間とメモリの使用量を見ています。

手元の Mac OSX Lione Core 2 Duo, Memory 4G で実行してみた結果が以下です。

goroutine:	100000
cpu:				2
time:				0.589367
memory all:	23.001915 MB
memory:			241.192560 byte/goroutine


かなり、小さい事がわかりますね。
というか不安なのでもう少し調べてみます(汗)

outro

goroutine と channel 自体の仕様はそんなに難しいものではありませんが、 2 つをを使って、かなり色々表現できることが分かって頂けたと思います。また、 select や closure といった機能が地味に協力なので、組み合わせるとさらに色々できるようになります。


と、ここまでが発表の範囲でしたがまだいくつかあるので、後で追記していきます。


go の売りの 1 つでもある、この組み込みの並行処理機能の使い方がなんとなくでも伝わればと思います。