Block Rockin’ Codes

back with another one of those block rockin' codes

Go の Generator を channel と closure で速度比較(または Go でのアトミック処理イディオム)

追記

この記事は元々 Go のイディオムとして、いわゆるジェネレータの実装がどうできるのかを軽い気持ちで書いただけで、速いから「Closure を使いましょう」などと一言も言ってなかったのですが、一方が channel であったため原子性についての言及がいくつかありました。

自分としては、ローカルのちょっとしたツール(shell の代わりくらい) の中で使ってただけなので、並行性には言及するつもりがそもそも無かったのですが、自分もそうした前提を書いていなかったのにも原因があります。

例えばこの記事が「グローバルシーケンス」の実装例として参考にされ Web アプリにでもコピペされて、バグの原因になったりでもしたらマズイので大幅に追記します。 (正直ロックを使った排他制御はあまり得意では無いですが。。)

Intro

無限に連番を生成するロジックをジェネレータとして組むときに、 Go の場合は二つの方法が考えられます。 Closure を使う方法と Channel を読みだす方法です。

Closure (goroutine unsafe)

まずは Closure を用いた単純な実装をしてみます。

func Closure() func() int {
    i := 0
    return func() int {
        i = i + 1
        return i
    }
}

今回のベンチマークではこの実装が最速です(後述)。 ただし、この実装は複数の Goroutine から呼び出される場合、インクリメントと取得の部分がクリティカルセクションになっています。 スレッドモデルの場合 "スレッドセーフ" といいますが、 Go ではスレッドではないので "Goroutine セーフ" と呼ぶことにします。この例は Goroutine セーフではない実装です。

Closure (mutex locked)

クリティカルセクションを守る方法としては、ロックを取る方法が多くの言語で使われてきました。 Go では、 sync.Mutex を用いてそれが実現できます。

func MutexClosure() func() int {
    var mutex = new(sync.Mutex)
    i := 0
    return func() int {
        mutex.Lock()
        defer mutex.Unlock()
        i = i + 1
        return i
    }
}

sync.Mutex を用いると Lock() と Unlock() を用いて排他制御が実行できます。 操作の前に Lock() を取得し、defer を用いて関数終了時に Unlock() を実行しています。

一般的に、ロックを用いた排他制御を実施する場合、必ずデッドロックに注意する必要があります。 上記の実装では使っていませんが、 Lock()、 Unlock() などが別の Goroutine で実行されていたり、その間に Channel による同期などを別途行っているような場合は特に注意が必要です。

おまけ Closure (atomic increment)

より低レベルの操作については、 sync/atomic パッケージも提供されています。 例えば CAS やアトミックなインクリメントなどが提供されています。

今回の場合、ここまでは int で実装していますが、 int の API はないために int32 を使うことを許容できるなら、今回のクリティカルセクションのロジックが単なるインクリメントであることから、 AddInt32 を用いることができるかも。と思いましたが、これは return がロックされていないのでダメですね。 これを使うなら、ユーザランドでグローバルにある値を直接インクリメントすればいいのかな。

func AtomicClosure() func() int32 {
    var n int32 = 0
    var i int32 = 1
    return func() int32 {
        return atomic.AddInt32(&n, i)
    }
}

また、このパッケージのドキュメント冒頭には以下の説明があります。

These functions require great care to be used correctly.
Except for special, low-level applications,
synchronization is better done with channels or the facilities of the sync package.
Share memory by communicating; don't communicate by sharing memory.

要するに、普通にアプリやツールを書くような用途での使用には推奨されておらず、(明示はされていませんが)より高度な実装で必要なケースに向けて提供されています。

Channel

sync パッケージを全く使用しなくてもアトミックな処理を実装することができます。 Go には channel という言語に組み込まれた MQ のような機能があり、 この channel を使用して、メッセージングにより同期をとる方法です。

func Channel() <-chan int {
    ch := make(chan int)
    go func() {
        i := 1
        for {
            ch <- i
            i = i + 1
        }
    }()
    return ch
}

この実装では外側の Channel() は読み取り専用の channel を返しており、これを持つ側は channel の読み出しを行うことで数値を取得できます。 この i は Channel() 実行時に生成される goroutine の中にしか存在せず、その goroutine と唯一繋がった channel に対しても書込することはできないため、外からは一切操作できません。

また、 channel には make 時に buffer を設定していないため、当時に一つの値しかそこを通ることができません。 つまり、channel に値があればそれが読み出されるまで書き込みはブロックし、 channle に値がなければ書き込まれるまでは読み出しはブロックします。

大量の goroutine がこの channel を保持していて、同時に読み出しでブロックしていても、書き込まれた値は一番最初に読み出しブロックに入った goroutine にしか読み出されません。

こうして、ロックを一切使わずにアトミックな処理が実現できるのです。

ベンチ

ベンチマークを取ってみます。

func BenchmarkClosure(b *testing.B) {
    cl := Closure()
    b.ResetTimer()

    for i := 0; i < b.N; i++ {
        cl()
    }
}

func BenchmarkMutexClosure(b *testing.B) {
    cl := MutexClosure()
    b.ResetTimer()

    for i := 0; i < b.N; i++ {
        cl()
    }
}

func BenchmarkChannel(b *testing.B) {
    ch := Channel()
    b.ResetTimer()

    for i := 0; i < b.N; i++ {
        <-ch
    }
}

実行結果

Mac Book Air (OSX 10.7.5) 1.6 GHz Intel Core i5 Memory 4GB

$ go test -bench .
PASS
BenchmarkClosure        100000000    11.5 ns/op
BenchmarkMutexClosure   50000000     35.3 ns/op
BenchmarkChannel        5000000      251 ns/op
ok  /path/to/bench  6.747s

ベンチマークの実行回数は、ベンチマークソースの中の b.N の値の部分になり、 関数の実行時間に応じて go test コマンドが適宜調整してくれます。 いずれも十分回数実行されているとみなし、 ns/op に注目して一実行あたりの平均実行時間を比較します。 (数回実行したところ、だいたい同じでした)

implimentation time (ns/op) ratio
closure 11.5 1
mutex 35.3 3.07
channel 251 21.8

最後の列が、 closure 実装を 1 とした場合の比率です。 (書き直す前は channel は 50 倍だったのですが、 channel の速度は変わらないのに closure の速度が何故が落ちたので、 20 倍に縮まりました。 元々の記事は、この結果が予想していたよりも大きかったなぁという趣旨でした。)

考察

ちょっとしたツールなどで、 1 つの Goroutine の中でしか使わないような実装であればどれでも構いませんが、 複数の Goroutine が共有する、いわゆるグローバルシーケンサを実装するような場合には、今回の Closure 実装は使えません。

同じことは、シーケンサでなくともグローバルカウンタや、もっと広く言えば複数の goroutine で共有するロジックでは共通です。 しかし、途中で解説したように sync/atomic は、理由がない限り積極的に使うべきでは無いでしょう。

mutex と channel の実装については、今回 7 倍の速度差が出ていますが、 mutex にはロックを伴う排他制御につきものな種々の難しさが、他の言語同様に伴います。

sync/atomic のドキュメントでも出ていた以下はその選択の指針になります。

Share memory by communicating;
don\'t communicate by sharing memory.

これは、意訳すると「メモリの共有(sync) ではなく、メッセージング(channle) でデータの共有をしましょう」という意味で、 Go はこうしたメッセージングによるイディオムを強く推奨しています。

一般的にロックでの排他制御は難易度が高く、デバッグもしづらいという経験則があり、メッセージングを用いた実装がシンプルで堅牢かつ柔軟なプログラミングが可能であるという理由からです。

同様な主張を元にした、 Go の並行処理については以下の様にドキュメント群でも散々言及されています。

一方で channel も、読み出しが競合してデッドロックするなど、慣れないと上手く書けない場合はあるでしょう。 そうした場合は、この辺が参考になると思います。

過去にこのブログでも言及しています。

結論

  • Goroutine をまたがないで、とにかく速度が欲しい場合: Closure
  • 複数の Goroutine から使い、ロックのプログラミングに慣れている場合: Mutex
  • 複数の Goroutine から使い、 Go の思想に則りたい、プログラムをシンプルにしたいetc: Channel
  • 筆者のおすすめ: Channel
  • 前提を書かずに記事を書かない

おまけ

実は Slice や Map の操作もアトミックではありません。 なので、今回行ったようなことを用いて、 Slice や Map の操作を自分でアトミックにする必要があります。 それを含むロジック全体をアトミックにすることもできますが、単体の追加/削除などをアトミックにするライブラリをいくつか作っているので、そのうちそれを紹介します。

ソースコード

使用したコードは以下にあります。

https://gist.github.com/Jxck/9184764