Block Rockin’ Codes

back with another one of those block rockin' codes

Go1.1 の Race Detector

intro

先々週、Go 1.1 がリリースされました。

いくつか新しい機能が入ったのですが、その中の Race Detector というのが面白そうだったので、
軽く調べてみました。

Race Detector

この機能は、簡単に言うと「レースコンディションが発生していないか」を調べる機能です。
といわれると、なんだかすごい機能ですね。


そもそもレースコンディションとは、マルチスレッドプログラミングなどで、単一のリソースを複数のスレッドで共有した際に、競合状態が発生して、予期しない結果を生んだりする状態です。
レースコンディションによるバグは、再現生が低かったりするので、一般的にデバッグが難しいとされています。


そうした状態が起こらないように、がっちりロックを取り合ったり、そもそもメモリを共有せずメッセージパッシングするなど、別のパラダイムで情報を共有する方法が取られます。


Go も、以下のように「メモリ共有より、メッセージ共有」という方針を推奨してます。

"Do not communicate by sharing memory; instead, share memory by communicating"

そこで使われるのが Goroutine と channel で、それについては Go の並行処理 - Block Rockin’ Codes でも書きました。


で、今回新しく入った機能は、そのレースコンディションが発生していないかどうかを、コンパイル時に Race Detector を仕込んでおくことで、実行時に調べることができるようです。
やっぱり、なんだかすごい機能ですね。

レースコンディションの例

Go の場合、複数の Goroutine から同一のメモリを変更するような場面がそれにあたります。

一番単純なのは、以下のような例でしょうか。

package main

import (
	"fmt"
	"time"
)

func main() {
	num := 10
	go func() {
		num += 1
		fmt.Println(num)
	}()
	num += 1
	fmt.Println(num)
	time.Sleep(time.Second)
}


main 自体も Goroutine なので、 num は main の Goroutine と、その中の匿名関数の Goroutine との間で共有されていて、両方が変更と出力を行なっています。


普通に実行すると、以下のとおりです。

$ go run test.go
11
12


まあ、そうでしょう。

-race オプション

では race detection してみます。
実行やビルドのコマンドに -race オプションを足すだけです。

$ go test -race パッケージ
$ go run -race ファイル
$ go build -race パッケージ
$ go install -race パッケージ


先ほどのを実行してみると。

$ go run -race test.go
11
==================
WARNING: DATA RACE
Write by goroutine 4:
  main.func·001()
      /private/tmp/race/test.go:11 +0x37
  gosched0()
      /usr/local/go/src/pkg/runtime/proc.c:1218 +0x9f

Previous write by goroutine 1:
  main.main()
      /private/tmp/race/test.go:14 +0xb5
  runtime.main()
      /usr/local/go/src/pkg/runtime/proc.c:182 +0x91

Goroutine 4 (running) created at:
  main.main()
      /private/tmp/race/test.go:13 +0xa5
  runtime.main()
      /usr/local/go/src/pkg/runtime/proc.c:182 +0x91

Goroutine 1 (running) created at:
  _rt0_amd64()
      /usr/local/go/src/pkg/runtime/asm_amd64.s:87 +0x106

==================
12
Found 1 data race(s)
exit status 66


標準出力には結果、エラー出力にこのトレースログがでます。(上は一緒にしてます。)

ざっくりみてみると、

  • 13 行目で生成された Goroutine 4 が 11 行目で
  • main メソッドである Goroutine 1 が 14 行目で

それぞれ同じ領域に書き込みしてるというように読み取れます。


解消方法は色々ありますが、とりあえず以下だとレースコンディション自体はなくなります。

func main() {
	num := 10
	go func(num int) {
		num += 1
		fmt.Println(num)
	}(num)
	num += 1
	fmt.Println(num)
	time.Sleep(time.Second)
}
$ go run -race test.go
11
11

ログは出ません。

どうやら、 Goroutine ごとのメモリアクセスの履歴をもとに、競合が起こってないかをしらべているようです。よくできてますねぇ。すごい。

オプション

GORACE という環境変数を通じてオプションを渡せます。

  • log_path (default stderr): ログファイルのパス(ファイル名は path.pid)。デフォルトは stderr
  • exitcode (default 66): race があった時終わらせるときのステータスコード
  • strip_path_prefix (default ""): 指定したパスをログから消せる
  • history_size (default 1): Goroutine のメモリアクセス履歴を 32K * 2**history_size で調節

こんな感じ

$ GORACE="log_path=/tmp/race history_size=4" go run -race test.go

注意点

現在は darwin/amd64, linux/amd64, and windows/amd64 のみの対応。
実行時のメモリ使用量が 5-10倍, 実行時間が 2-20倍に 増えるようです。

テストで確認しておいて、ビルド時は外すとかの方がよさそうですね。

まとめ

メッセージパッシングに寄せて Goroutine + Channel でやっていたとしても、レースコンディションが起こってしまうこともあるかもしれません。

そして、レースコンディションは、「起こってない」ことを調べるのは一般的には難しいと思うんですが、その検出を言語のコアに取り込んでいるというのが Go らしくていいと思います。

テストでは必ずつけて、みつけたら channel に書き換えるというような癖をつけると、幸せになれるのかもしれません。

Go の並行処理

intro

先日の Go のカンファレンス GoCon で、 Go の並行処理周りについて発表させて頂きました。


Go Conference 2013 spring - connpass


具体的には Goroutine や Channel の話ですが、これらの機能は結構面白くて、いじって遊んでるだけでもわくわくします。
Go の並行処理は、設計方針がわりと特殊だと思うのですが、設計がシンプルなので分かるとそこまで難しくはないです。
(使いこなすのは、経験が必要そうですが)


今回話すにあたって色々調べましたが、発表時間の都合上省いたものもあるし、質疑応答で聞かれて応えられなかったこともあるので、
ここでまとめて置こうと思います。

発表資料

今回の発表資料はこちらです。
このブログの内容は、これをベースにします。


http://jxck.node-ninja.com/slides/gocon-2013spring.html


ソースは、ここにまとめました。


https://github.com/Jxck/goroutine-sample

Go の Concurrency Model

並行処理プログラミングには、ざっくり分けて二つのアプローチがあります。

Shared-memory communication
ワーカ間でメモリ(リソース) を共有する。レースコンディションが起こらないようにロックをとることが多く、その実装は難しいことが多いとされる。
Message-passing communication
ワーカ間でメッセージパッシングを行う。Erlang などに実装された Actor モデルなどが代表的な実装。


そして、Go の並行処理モデルの方針は下記に宣言されています。

"Do not communicate by sharing memory; instead, share memory by communicating"


なので、上記で言うと後者なのですが、 Erlang の Actor モデルなどとは異なる点がいくつかあります。


Go での実装には、下記の二つが参考にされています。

CSP (Communicating Sequential Processes)
並行処理のための設計理論。実装としては Occam, Limbo などがある。
π caluculus
こちらも並行処理のための理論。Erlang のメッセージングが ! なのはこれが元になってるよう。日本語の情報ならここA Very Brief Introduction to the Pi-Calculus (in Japanese)


これらを実現するために、 Go には以下の機能が実装されています。

  • goroutine
  • channel


また、以下の二つもそれをより強力にします。

  • select
  • closure

Goroutine

Goroutine とは
  • Coroutine ではない。
  • Thread, Process でもない。
  • 複数の Thread 上に多重化されて実装されてる。
  • main() 自身や Scavenger(GC) などランタイムも goroutine を使ってる。


上記周りの話は、自分のあとの methane さんの発表の方が詳しかったので、そちらを参照して下さい。
https://gist.github.com/methane/5377227#file-goscheduler-md

go 文での起動

goroutine は go 文で関数を実行すると起動できます。
go 文はブロックしないので、 goroutine は非同期に起動されます。
下記の main() では、goroutine を二つ起動してから time.Sleep() で一時停止しているのは、これが無かったら f() が実行される前に main() が終わってしまうからです。


goroutine が他に実行されていようと main() が終わるとプロセスが終了します。

func f(msg string) {
	log.Println(msg)
}

func main() {
	go f("hello")
	go func() {
		log.Println("gopher")
	}()
	time.Sleep(time.Second)
}

https://github.com/Jxck/goroutine-sample/blob/master/goroutine.go

goroutine の終了条件

goroutine は、下記の条件で終了します。

  • 関数が終わる
  • return で抜ける
  • runtime.Goexit() を実行する
func main() {
	go func() {
		log.Println("end")
	}()
	go func() {
		log.Println("return")
		return
	}()
	go func() {
		log.Println("exit")
		runtime.Goexit()
	}()
	time.Sleep(time.Second)
}

https://github.com/Jxck/goroutine-sample/blob/master/goroutine-exit.go


runtime.NumGoroutine

現在起動している goroutine の数を知ることができます。

func main() {
	log.Println(runtime.NumGoroutine())
}

これを実行すると、自分の環境では 2 と出ました。


まず、 main() そのものが goroutine なので、 1 つはわかります。
しかし、もう 1 つは誰でしょう? 二つの関数を使ってますが、いずれも goroutine は使ってません。


ML で聞いてみたところ、 bradfitz さんが応えてくれました。こんな大物の返信がもらえるとか、、


結果から言うと、下記に変えると分かります。

func main() {
	log.Println(runtime.NumGoroutine())
	select{}
}

これを GOTRACEBACK=2 をつけて実行します。

GOTRACEBACK=2 go run numgoroutine.go

出力が長いのでのせませんが、見ると runtiem.main() 以外に runtime.MHeap_Scavenger() が動いてる事がわかります。


このように、 go 文で起動した goroutine や main() 以外にも、 Scavenger (GC) などランタイム環境でも goroutine が使われていることがわかります。


この情報は、 runtime.Stack からも取れます。

func main() {
	log.Println(runtime.NumGoroutine())
	buf := make([]byte, 1<<20)
	buf = buf[:runtime.Stack(buf, true)]
	log.Println(string(buf))
	select {}
}

https://github.com/Jxck/goroutine-sample/blob/master/who.go

Channel

Channel とは
  • Channel は goroutine 間でのメッセージパッシングをするためのもの
  • メッセージの型を指定できる
  • first class value であり、引数や戻り値にも使える
  • send/receive でブロックする
  • buffer で、一度に扱えるメッセージ量を指定できる
Channel を用いたメッセージング

Channel は参照型なので make() でインスタンスを生成します。
使い方は簡単で、送信が "channel<-value" で受信が "<-channel" です。


下記は、 goroutine と main() 間でのメッセージングです。
main() の最後では受信結果を出力しており、その受信で値が届くまでブロックするので、 time.Sleep() は必要無い点に注意して下さい。

func f(ch chan bool) {
	ch <- true
}

func main() {
	ch := make(chan bool)
	go f(ch)
	log.Println(<-ch) // ここでデータが来るまでブロックする
}

https://github.com/Jxck/goroutine-sample/blob/master/channel.go

同期

channel は、送受信が完了するまでブロックします。
このことを、 goroutine 間の同期に応用することができます。


下記は、 main() が channel を受信する事で goroutine の終了を待つ同期をしています。
受信した値自体は必要ないため、捨てています。 goroutine は channel を closure で参照しています。closure があると本当に便利ですね。

func main() {
	fin := make(chan bool)
	go func() {
		log.Println("worker working..")
		fin <- false
	}()
	<-fin
}

https://github.com/Jxck/goroutine-sample/blob/master/finchannel.go


上記は、 goroutine が 1 つですが、複数ある場合は数を管理しないといけなくなります。
それを行う場合は sync.WaitGroup というモジュールが使用できます。

func main() {
	var wg sync.WaitGroup
	for i:=0; i<3; i++ {
		wg.Add(1) // goroutine を生成するたびインクリメント
		go func(i int) {
			log.Println(i)
			wg.Done() // 終了時にデクリメント
		}(i)
	}
	wg.Wait() // ブロックし、全ての Done が終わったら次に進む
}

https://github.com/Jxck/goroutine-sample/blob/master/waitGroup.go


sync.WaitGroup は channel を使っているわけではないようです。
sync パッケージには、 lock をとったりするパッケージがるので、 channel を使わずそれでリソース共有/同期をすることもできますが、最初に述べたように Go では極力メッセージングでリソース共有/同期をしましょう。

Worker の起動

下記は、ワーカを 3 つ起動し、それぞれの処理結果を main() で受け取っています。
worker() を go 文で実行し、結果を渡してもらうための channel を渡すのではなく、 worker が内部で、結果を渡すための channel を生成しそれを返しているので、 main() はそこから取り出しています。

func worker(msg string) <-chan string {
	receiver := make(chan string)
	for i := 0; i < 3; i++ {
		go func(i int) {
			msg := fmt.Sprintf("%d %s done", i, msg)
			receiver <- msg
		}(i)
	}
	return receiver
}

func main() {
	receiver := worker("job")
	for i := 0; i < 3; i++ {
		log.Println(<-receiver)
	}
}

https://github.com/Jxck/goroutine-sample/blob/master/workers.go


worker の宣言に注目して下さい。

// func worker(msg string) chan string こうではない
func worker(msg string) <-chan string

worker が返す型は「読み取り専用の channel」です。(<- がついてる)
これにより、 main() がこの channel に誤ってデータを書き込むことを防ぎます。

複数の channel と select

先の例では、 worker は 3 つのメッセージを返すことが予め分かっていたので、 3 つだけ取り出していました。
しかし、 worker が予めわからない場合などもあります。
その場合は、用途の違う別の channel を返すことで、必要なメッセージをとることもできます。

Go では、関数が複数の値を返すことができるため、以下の例は worker が終わったことを返すための channel を worker が返すようにしています。


main() では、複数のチャネルからのメッセージを受信する必要がありますが、それぞれブロックしてしまいます。そんな場合は、 select という構文を使うと、複数の channel の受信を同時に行うことができます。

func worker(msg string) (<-chan string, <-chan bool) {
	var wg sync.WaitGroup
	receiver := make(chan string)
	fin := make(chan bool)
	go func() {
		for i := 0; i < 3; i++ {
			wg.Add(1)
			go func(i int) {
				msg := fmt.Sprintf("%d %s done", i, msg)
				receiver <- msg
				wg.Done()
			}(i)
		}
		wg.Wait()
		fin<-false // 終了を伝える
	}()
	return receiver, fin
}

func main() {
	receiver, fin := worker("job")
	for {
		select {
		case receive := <-receiver:
			log.Println(receive)
		case <-fin: // 終了したら終わる
			return
		}
	}
}

https://github.com/Jxck/goroutine-sample/blob/master/finchannel.go

Channel の close()

close() は組み込みの関数で、用の済んだ channel を閉じることができます。
そもそも channel の呼び出しは 2 つの値が受け取れます。

message, ok := <-channel

この 2 つめの ok は、 channel が閉じられているかを表す bool です。
ok は、取っても取らなくても良い仕様になっています。


channel を close() した場合、受信側には 空のメッセージと false が渡るので、これを使って channel が close() されたことを判定できます。

 
先ほどの例を close を使って書き直すと以下になります。

func worker(msg string) (<-chan string) {
	var wg sync.WaitGroup
	receiver := make(chan string)
	go func() {
		for i := 0; i < 3; i++ {
			wg.Add(1)
			go func(i int) {
				msg := fmt.Sprintf("%d %s done", i, msg)
				receiver <- msg
				wg.Done()
			}(i)
		}
		wg.Wait()
		close(receiver)
	}()
	return receiver
}

func main() {
	receiver := worker("job")
	for {
		receive, ok := <-receiver
		if !ok {
			log.Println("closed")
			return
		}
		log.Println(receive)
	}
}

https://github.com/Jxck/goroutine-sample/blob/master/close.go

timeout

重たい worker がいた場合、 worker からの終了通知を受け取らずに、一定時間経過したら終わりたい場合もあります。


そんな時は、 time.After という関数を使うことができます。


time.After の型は以下です。

func After(d Duration) <-chan Time


一定時間経過したらメッセージを送る channel を返すので、この channel を受け取って受信をしておけば、一定時間後に処理をするためのトリガーにできます。


これを用いて 1 秒後に timeout するように書き換えてみます。

func randomTime() time.Duration {
	return time.Duration(rand.Intn(1e3)) * time.Millisecond
}

func worker(msg string) <-chan string {
	receiver := make(chan string)
	for i := 0; i < 300; i++ {
		go func(i int) {
			time.Sleep(randomTime())
			msg := fmt.Sprintf("%d %s done", i, msg)
			receiver <- msg
		}(i)
	}
	return receiver
}

func main() {
	receiver := worker("job")
	for {
		select {
		case receive := <-receiver:
			log.Println(receive)
		case <-time.After(time.Second): // 一定時間経過したら受信
			log.Println("timeout")
			return // 受信時に終われば timeout 処理になる。
		}
	}
}

https://github.com/Jxck/goroutine-sample/blob/master/timeout.go

Buffer

Channel は、 make() 時に buffer を指定することができます。
この buffer の値は、一度に channel に書き込める message の上限値になります。
デフォルトは 0 です。指定すると、 MQ のように扱うことができるイメージです。


送信は buffer が一杯だった場合は送信でブロックします。
これを利用して、例えば worker が同時に起動する数を制限できます。


以下は、 100 個の処理を同時に 5 つだけの goroutine を起動して行うサンプルです。

func worker(msg string) <-chan string {
	limit := make(chan int, 5)
	receiver := make(chan string)
	go func() {
		for i := 0; i < 100; i++ {
			log.Println(runtime.NumGoroutine())
			limit <- 1
			go func(i int) {
				msg := fmt.Sprintf("%d %s done", i, msg)
				receiver <- msg
				<-limit
			}(i)
		}
	}()
	return receiver
}

func main() {
	receiver := worker("job")
	for i := 0; i < 100; i++ {
		log.Println(<-receiver)
	}
}

https://github.com/Jxck/goroutine-sample/blob/master/buffer.go


runtime.NumGoroutine() の結果は必ずしも 5 にならない点は前述の通り。


Pattern 編

よくあるパターンを、ここまでの内容を使って実装してみます。

coroutine

coroutine と言えば Lua でしょう。
スケジュールを記述することで、処理を途中で止めて、そこからリスタートすることができます。


単純な例を Lua で書くとこんな感じです。
f() の中の処理を途中で中断し、そこから再開している例です。

function f()
	coroutine.yield "one"
	coroutine.yield "two"
	coroutine.yield "three"
	return
end

local co = coroutine.wrap (f)

print (co ()) -- one
print (co ()) -- two
print (co ()) -- three

https://github.com/Jxck/goroutine-sample/blob/master/coroutine.lua



Go では channel がブロックすることを利用します。

func f(yield chan string) {
	yield <- "one"
	yield <- "two"
	yield <- "three"
}

func main() {
	co := make(chan string)
	go f(co)
	log.Println(<-co) // one
	log.Println(<-co) // two
	log.Println(<-co) // three
}

https://github.com/Jxck/goroutine-sample/blob/master/coroutine.go

generator

generator は、多くの場合配列のように扱えますが、扱う値が実行時に生成されている点が配列と違います。予め値を生成しないことにより、メモリ消費が少ないのが特徴です。


10 まで値を取り出せる generator は、 python だと以下のようになります。

def generator(n):
	i = 0
	while True:
		if i > n: break
		yield i
		i += 1

for i in generator(10):
	print i

https://github.com/Jxck/goroutine-sample/blob/master/generator.py


Go ではやはり、 channel のブロックを使います。

func generator(n int) chan int {
	ch := make(chan int)
	i := 0
	go func() {
		for {
			ch <- i
			i++
			if i > n {
				close(ch)
				break
			}
		}
	}()
	return ch
}

func main() {
	for x := range generator(10) {
		log.Println(x)
	}
}

https://github.com/Jxck/goroutine-sample/blob/master/generator.go


make() の部分で buffer を指定していないので、同時に 1 つしか生成されないことが保証できます。(ということは生成する数を指定することもできる)

その他

multi-core

現時点では、 goroutine がマルチコアを自動的に使いこなすような最適化はされないようです。


もし、 goroutine を複数のコアで実行したい場合は GOMAXPROC 環境変数か、 runtime.GOMAXPROCS() にコア数を指定します。


よって、ソースに以下のように書くことが多いです。

cpus := runtime.NumCPU()
runtime.GOMAXPROCS(cpus)
ベンチ

goroutine のメモリ使用量を調べます。
こちらにあったのをお借りして、少し変えてみました。
https://gist.github.com/jgrahamc/5253020

func main() {
	cpus := runtime.NumCPU()
	runtime.GOMAXPROCS(cpus)
	count := 1000 * 100

	var startMemory runtime.MemStats
	runtime.ReadMemStats(&startMemory)

	start := time.Now()
	fin := make(chan bool)
	for i := 0; i < count; i++ {
		go func() {
			<-fin
		}()
	}
	elapsed := time.Since(start)

	var endMemory runtime.MemStats
	runtime.ReadMemStats(&endMemory)
	close(fin)

	fmt.Printf(`
goroutine:	%d
cpu:				%d
time:				%f
memory all: %f MB
memory:			%f byte/goroutine
`,
		count, cpus, elapsed.Seconds(),
		float64(endMemory.Alloc-startMemory.Alloc)/float64(1024*1024),
		float64(endMemory.Alloc - startMemory.Alloc)/float64(count))
}

https://github.com/Jxck/goroutine-sample/blob/master/goroutine-bench.go


100,000 個の goroutine を起動して、その時間とメモリの使用量を見ています。

手元の Mac OSX Lione Core 2 Duo, Memory 4G で実行してみた結果が以下です。

goroutine:	100000
cpu:				2
time:				0.589367
memory all:	23.001915 MB
memory:			241.192560 byte/goroutine


かなり、小さい事がわかりますね。
というか不安なのでもう少し調べてみます(汗)

outro

goroutine と channel 自体の仕様はそんなに難しいものではありませんが、 2 つをを使って、かなり色々表現できることが分かって頂けたと思います。また、 select や closure といった機能が地味に協力なので、組み合わせるとさらに色々できるようになります。


と、ここまでが発表の範囲でしたがまだいくつかあるので、後で追記していきます。


go の売りの 1 つでもある、この組み込みの並行処理機能の使い方がなんとなくでも伝わればと思います。

JSON - を node の Stream で整形する

intro

ちょっと反応が遅れてしまいましたが。


404 Blog Not Found:JSON - をnodeで整形する


こちらの記事は Stream 厨として見逃す訳にはいきませんでした。

motivation

まあ、 JSON ですしね。

  • そのJavaScriptに今や標準搭載のJSON.stringify()は実はpretty printできるし

確かに stringify でできますね。(昔こちらにも書きました。)

  • どうせなら標準入力だけではなくURIやファイル名で直アクセスしたいし

同意です。


しかし、実装を見てみると、、
(以下、主要な部分の抜粋)

// stdout
stdin.on("data", voorhees);
// http
http.get(req, function (res) {
  res.on('data', function (data) {
    chunks.push(data)
  }).on('error', function (e) {
    console.log(e.message);
  }).on('end', function () {
    voorhees(chunks.join(''))
  });
});
// fs
fs.readFile(argv[2], function (err, data) {
  if (err) throw err;
  voorhees(data);
});


実は、こうした「データソースから受け取ったデータを加工して、次に渡す」といったものこそ、 Stream に向いています。
正確には、今回の用途は「データをまとめて扱っている」ので、 Stream にする必要はかならずしもないです。


しかし、 Stream にしておくと以下のようなメリットがあるかと思います。

  • すでに Stream として抽象化されているものとの接続製があがる。(substackdominic のライブラリ集が使える!)
  • うまくやれば、 JSON のストリーム(次々と途切れなく流れてくる JSON の配列とか) を、途中途中で Stringify しながら画面表示したりできる。
  • voorhees は CLI を提供するフロントの層と JSON を整形するロジックの層が1つになっています(それ自体をどうこう言う気はないです) が、こうしたレイヤの分離も Stream を使うとうまくいったりします。


そしてなにより、先の 3 つのリソース(stdin/out, file, http) は全て、標準ですでに Stream として抽象化されています。


乗るしか無い、このエコシステムに!

Transform Stream

今回の用途だと、 Transform Stream にすることになります。特徴だけ説明します。


今回は、チャンクごとに処理せず、まとめて最後に処理するので、 _transform() はデータを溜めるだけで、 _flush() で全部 stringify() 処理します。

JSPP.prototype._transform = function(chunk, output, cb) {
  chunk = chunk.toString();
  if (chunk) {
    this.data += chunk;
  }
  return cb(null);
};

JSPP.prototype._flush = function(output, cb) {
  if (this.data) {
    try {
      output(this._pp(this.data));
    } catch(e) {
      return cb(e);
    }
  }
  cb(null);
};


使う場合は、 pipe するだけです。

// 標準入出力の場合
process.stdin.pipe(jspp).pipe(process.stdout);


// http の場合
http.get(url, function (res) {
  res.pipe(jspp).pipe(process.stdout);
});


// ファイルの場合
fs.createReadStream(file).pipe(jspp).pipe(process.stdout);


こうして、標準モジュールや、他の Stream を実装したライブラリと、好き放題繋ぐ事ができます。

jspp-stream

ということで、以前作った ltsv-stream をちょちょっといじって、 jspp(json pritty print) stream ということで実装してみました。


https://github.com/Jxck/jspp-stream


Stream 実装(jspp-stream.js)だけでなく、これを使った voorhees 互換の jspp コマンドも同梱してあるので、 npm -g で入れれば使えます。
(こうして、ロジックの層とフロントの層が簡単に分けられるのも、 Stream を使うメリットの1つです)

$ npm install -g jspp-stream
$ cat test.json | jspp
$ jspp /path/to/file.json
$ jspp http://some.json.com


テストもエラー処理も適当ですが、まあこんな感じでできますよということで。

Go の interface 設計

history

13/3/31
Tag について追加

intro

Go を触ってて interface を用いた設計がまだまだよくわかってなかったので、一旦まとめることにしました。

Go には明示的な継承の機能は無く、 interface も例えば Java のそれとはかなり毛色が違うので、(Class ではなく) Struct の設計に結構癖があると感じます。

Go の interface は言語設計的にもかなり尖っていて、 Go という言語を強く特徴付けていると同時に、 Go 言語自体の開発者たちもこの機能をかなり重要視しています。


例えば、 Go の開発者の一人である Russ Cox 氏によれば

Go's interfaces―static, checked at compile time, dynamic when asked for―are,
for me, the most exciting part of Go from a language design point of view. If
I could export one feature of Go into other languages, it would be
interfaces.

http://research.swtch.com/interfaces

とのこと。


「Interface を制すものは Go を制す」


なのかもしれない、と思いまずこの機能を使い方の視点からまとめてみました。

(まあ、単にヤックの毛を狩り始めたら、随分遠くまで来てしまったという話。。)


ソース

今回の記事のもとになるソースは、こちらにあります。

https://gist.github.com/Jxck/5237600


Basic

まずは、基本的な Struct (Class 的なもの) とそのメソッド定義から。
(本当は下記の例は String() を実装するのが正解だけどそれは別で)

// 基本的な Struct
type Point struct {
	X int
	Y int
}

// Struct のメソッド
func (p Point) Coordinate() string {
	// p がレシーバ
	return fmt.Sprintf("(%d, %d)", p.X, p.Y)
}

func main1() {
	var a Point = Point{2, 3}
	fmt.Println(a.Coordinate()) // (2, 3)
}

既存の型も拡張できます。

// 既存の型を元にした独自の型を定義できる。
type trimmedString string

// そこにメソッドも追加できる。
func (t trimmedString) trim() trimmedString {
    return t[:3]
}

func main2() {
	var t trimmedString = "abcdefg"
	fmt.Println(t.trim())

	// 型変換
	var s string = string(t)

	// 型を変換したので、 trim() は無い
	// fmt.Println(s.trim())
	fmt.Println(s)
}


この辺は tour of go でもおなじみです。

interface

Go の interface の定義の内容は、単なるメソッドリストです。
リスト中のメソッドを全て実装していれば、その Interface を満たす(satisfy) とみなされます。


Java では実装(implements) と宣言されますが、そうした明示的な宣言はないのが Goの特徴です。

// Interface を宣言
type Accessor interface {
	GetText() string
	SetText(string)
}

// Accessor を満たす実装
// Interface の持つメソッド群を実装していれば、
// Interface を満たす(satisfy) といえる。
// 明示的な宣言は必要なく、実装と完全に分離している。
type Document struct {
	text string
}

func (d *Document) GetText() string {
	return d.text
}

func (d *Document) SetText(text string) {
	d.text = text
}

func main3() {
	// Document のインスタンスを直接変更しても
	// 値渡しになってしまうので
	// ポインタを使用
	var doc *Document = &Document{}
	doc.SetText("document")
	fmt.Println(doc.GetText())

	// Accessor Interface を実装しているので
	// Accessor 型に代入可能
	var acsr Accessor = &Document{}
	acsr.SetText("accessor")
	fmt.Println(acsr.GetText())
}

mixin

別の型を定義に含むと、含んだ型のメソッドが定義された状態になります。
これは継承というよりは mixin のイメージですね。
この場合、継承(引い継いだ、程度の意味での)されたメソッド内のレシーバの挙動に注意です。

type Page struct {
	Document // 匿名型を含むと、その型のメソッドが継承(というか mixin)される
	Page     int
}

func main4() {
	// Page は Document を継承しており
	// Accessor Interface を満たす。
	// この場合代入可能
	var acsr Accessor = &Page{}
	// この値は acsr.Document.text に設定されてる。
	// acsr の構造体がレシーバになっているわけではないということ
	acsr.SetText("page")
	fmt.Println(acsr.GetText())

	// Document と Page の間に代入可能な関係は無い
	// var page Page = Document{}
	// var doc Document = Page{}
}

Duck Typing

「アヒルのように鳴くなら、それはアヒル。」
動的な型付け言語では、「鳴く」ことができるかは実行時に試すか、予めメソッドの先頭などで調べる必要があります。


Go の場合は「鳴く」こと自体を interface で定義し、ランタイムではなくコンパイル時にチェックできます。
メソッドの引数に Interface 型を指定して、コンパイルが通った時点で、渡された値はアヒルのように鳴くことが保証されるわけです。

/*
	Duck Typing
	Accessor を満たしていれば、 Get, Set できるという例。
*/

func SetAndGet(acsr Accessor) {
	acsr.SetText("accessor")
	fmt.Println(acsr.GetText())
}

func main5() {
	// どちらも Accessor として振る舞える
	SetAndGet(&Page{})
	SetAndGet(&Document{})
}

Override

Override はそのまま。
細かいけど int -> string の変換は string(i) ではできず、 strconv.Itoa を使う。
(ただ、 Overload はできない、その辺はどうすればいいんだろう。。可変長引数かな?)

/*
	Override
*/
type ExtendedPage struct {
	Document
	Page int
}

// Document.GetText() のオーバーライド
func (ep *ExtendedPage) GetText() string {
	// int -> string は strconv.Itoa 使用
	return strconv.Itoa(ep.Page) + " : " + ep.Document.GetText()
}

func main6() {
	// Accessor を実装している
	var acsr Accessor = &ExtendedPage{
		Document{},
		2,
	}
	acsr.SetText("page")
	fmt.Println(acsr.GetText()) // 2 : page
}

Interface 型

Interface 型はメソッドを持たない Interface です。
つまり、メソッドを持たない struct を含めて、全ての struct は Interface 型を満たすと言えます。

例えば、下記のメソッドは全ての型の値を受け取ることができます。

func f(v interface {}) {
  // v
}

しかし、ここで重要なのは v の型は interface{} 型であるということ。
Go の runtime では、全ての値は必ず一つの型を持つので、型が不定といったことはなく、メソッドの引数などでは可能であれば型の変換が行われます。
上記 f() は全ての値を interface{} 型に変換します。
interface {} 型の struct が、任意の type であることは、型アサーションという機能を用いて行います。
アサーションシグニチャは以下。

e, ok := v.(type) // v が type を満たすかを調べる
// Interface 型

// Get() があるかを調べる
// er を付ける命名が慣習
type Getter interface {
	GetText() string
}

func dynamicIf(v interface{}) string {
	// v は Interface 型

	var result string
	g, ok := v.(Getter) // v が Get() を実装しているか調べる
	if ok {
		result = g.GetText()
	} else {
		result = "not implemented"
	}
	return result
}

func dynamicSwitch(v interface{}) string {
	// v は Interface 型

	var result string

	// v が実装している型でスイッチする
	switch checked := v.(type) {
	case Getter:
		result = checked.GetText()
	case string:
		result = "not implemented"
	}
	return result
}

func main7() {
	var ep *ExtendedPage = &ExtendedPage{
		Document{},
		3,
	}
	ep.SetText("page")

	// do は Interface 型を取り
	// ジェネリクス的なことができる
	fmt.Println(dynamicIf(ep))       // 3 : page
	fmt.Println(dynamicIf("string")) // not implemented

	// 型スイッチを使う場合
	fmt.Println(dynamicSwitch(ep))       // 3 : page
	fmt.Println(dynamicSwitch("string")) // not implemented
}

Generics

Go の wikiで紹介されている方法です。

interface{} をそのまま使わず、 Any 型を定義して置き換えているだけですが、これなら後で Any 型の定義をいじれるのでより良さそうです。

これにより、 Generics 的なことができます。(JavaGenerics とまではいきませんが)

// 全ての型を許容するインタフェースのようなものを作っておく
type Any interface{}

// ジェネリクス的な
type GetValuer interface {
	GetValue() Any
}

// Any 型で実装
type Value struct {
	v Any
}

// GetValuer を実装
func (v *Value) GetValue() Any {
	return v.v
}

func main8() {
	// インタフェースで受け取る
	var i GetValuer = &Value{10}
	var s GetValuer = &Value{"vvv"}

	// インタフェース型のコレクションに格納
	var values []GetValuer = []GetValuer{i, s}

	// それぞれ GetValue() が Any で呼べる
	for _, val := range values {
		fmt.Println(val.GetValue())
	}
}

inerface の実装

interface の値は二つのポインタから成る。

  • 元になる型の、メソッドテーブル
  • 元の値が持つ値

これがわかっていれば、下記が間違っていることがわかる。
[]string のメソッドテーブルと値は持てるが、その中の値を interface には変換できないから。

参考 http://golang.org/doc/faq#convert_slice_of_interface

func PrintAll(vals []interface{}) {
	for _, val := range vals {
		fmt.Println(val)
	}
}

func main9() {
	names := []string{"one", "two", "three"}

	// これは間違い
	// PrintAll(names)

	// 明示的に変換が必要
	vals := make([]interface{}, len(names))
	for i, v := range names {
		vals[i] = v
	}
	PrintAll(vals)
}

Interface の設計例 1 (interface{})

interface の設計例として、下記に 2 つの例があるので、それを借用します。
http://jordanorelli.tumblr.com/post/32665860244/how-to-use-interfaces-in-go


1 つ目は Twitter API で受け取った JSON 内の日付の文字列をパースする例です。
JSON のパース自体は json.Unmarshal() で可能です。
パース先の型として map[string]interface{} を用いればどんな型でもとりあえずパース可能です。

// 1, twitter API から Time のパース

/*
	Twitter の JSON を map にパースする。
	twitter の JSON には、時間が Ruby フォーマットの文字列で格納されているので、
	それを考慮して型を考える。
	"Thu May 31 00:00:01 +0000 2012"
*/

var JSONString = `{ "created_at": "Thu May 31 00:00:01 +0000 2012" } `

func main10() {
	// map として、 {string: interface{}} としてしまえば
	// value がなんであれパースは可能
	var parsedMap map[string]interface{}

	if err := json.Unmarshal([]byte(JSONString), &parsedMap); err != nil {
		panic(err)
	}

	fmt.Println(parsedMap) // map[created_at:Thu May 31 00:00:01 +0000 2012]
	for k, v := range parsedMap {
		fmt.Println(k, reflect.TypeOf(v)) // created_at string
	}
}

Interface の設計例 1 (time.Time)

本来 Go の time.Time 型であると望ましいのですが、 Ruby の文字列フォーマットがtime.Time の文字列とデフォルトフォーマットが違います。
そこで、 time.Time を元に新たな型を定義します。

また、 JSON の Unmarshaller のインタフェースは下記のようになっているので、UnmershalJSON() を実装すれば、 Unmarshaller は満たされます。

type Unmarshaler interface {
  UnmarshalJSON([]byte) error
}


なんとデフォルトで time.RubyDate が用意されているのでそれを使えば終わりです。
http://golang.org/pkg/time/#pkg-constants (これぞ Battery Included !)

type Timestamp time.Time

// Unmarshaller を実装
func (t *Timestamp) UnmarshalJSON(b []byte) error {
	v, err := time.Parse(time.RubyDate, string(b[1:len(b)-1]))
	if err != nil {
		return err
	}
	*t = Timestamp(v)
	return nil
}

func main11() {
	var val map[string]Timestamp // 定義した型を使う

	if err := json.Unmarshal([]byte(JSONString), &val); err != nil {
		panic(err)
	}

	// パースされていることを確認
	for k, v := range val {
		fmt.Println(k, time.Time(v), reflect.TypeOf(v))
		// created_at 2012-05-31 00:00:01 +0000 +0000 main.Timestamp
	}
}

Interface の設計例 2 (JSON Entity)

HTTP リクエストから JSON を取得し、オブジェクトにパースする例です。

単純にシグネチャを考えると以下のようになります。

GetEntity(*http.Request) (interface{}, error)

これは、戻り値の方に汎用性を持たせて、どのような型のデータも取り出せるようにしています。
しかし、これだと戻り値は毎回型変換しないといけないし、 Postel の法則に反します。
(「送信するものに関しては厳密に、受信するものに関しては寛容に」)

しかし、例えば取り出す型を User として下記のようにシグネチャを変更すると、型の数だけ GetXXXX が必要になります。

GetUser(*http.Request) (User, error)

そこでインタフェースを導入します。


下記の例は、 http でやるとちょっと面倒なので、 JSON を文字列まで取り出せたと仮定して、 JSON をパースする例に置き換えています。

(JSON から、方に応じて違う値を取り出す例ですが、メソッドの中が同じになってしまったのであまり良い例ではないけど、力尽きたのでそのまま。。)

// 各型が、自身のパース実装を持てばよいので、そのメソッドだけ定義しておく。
type Entity interface {
	UnmarshallJSON([]byte) error
}

func GetEntity(b []byte, e Entity) error {
	// 各実装に処理を移譲
	return e.UnmarshallJSON(b)
}

// 型を定義
// User に関する必要なデータだけ取りたい型的な
type UserData struct {
	Id        int
	Name      string
	Time_Zone string
	Lang      string
}

// *_count だけ適当に取りたい型的な
type CountData struct {
	Followers_count  int
	Friends_count    int
	Listed_count     int
	Favourites_count int
	Statuses_count   int
}

// Entity を実装
// ここでは、 json モジュールになげるだけで
// 同じ実装でできてしまったが、
// 本来 Entity ごとに違う実装になる。
func (d *UserData) UnmarshallJSON(b []byte) error {
	err := json.Unmarshal(b, d)
	if err != nil {
		return err
	}
	return nil
}

func (d *CountData) UnmarshallJSON(b []byte) error {
	err := json.Unmarshal(b, d)
	if err != nil {
		return err
	}
	return nil
}

func main12() {
	// 対象の JSON 文字列
	EntityString := `{
		"id":51442629,
		"name":"Jxck",
		"followers_count":1620,
		"friends_count":617,
		"listed_count":204,
		"favourites_count":2895,
		"time_zone":"Tokyo",
		"statuses_count":17387,
		"lang":"ja"
	}`
	userData := &UserData{}
	countData := &CountData{}
	GetEntity([]byte(EntityString), userData)
	GetEntity([]byte(EntityString), countData)
	fmt.Println(*userData)  // {51442629 Jxck Tokyo ja}
	fmt.Println(*countData) // {1620 617 204 2895 17387}
}

Tag

ブコメで指摘されて初めて知ったのですが、 Struct には Tag を付けることができます。
このタグは、 reflect モジュールを使ってしかアクセスすることがでず、 Struct に対するメタ情報のような使い方をするようです。


まず、タグは以下のように Struct フィールドの型の次に記述し、 reflect で取り出すことができます。

// タグ付きの Struct を定義
type TaggedStruct struct {
	field string `tag:"tag1"`
}

func main13() {
	// reflect でタグを取得
	var ts = TaggedStruct{}
	var t reflect.Type = reflect.TypeOf(ts)
	var f reflect.StructField = t.Field(0)
	var tag reflect.StructTag = f.Tag
	var val string = tag.Get("json")
	fmt.Println(tag, val) // json:"emp_name" emp_name
}


現在標準ライブラリで Tag が使われているのは encoding/json と encoding/xml だけのようです。
例えば json では、 JSON のキー名と Struct のフィールド名が同じであればそのまま Unmarshal 可能ですが、もし二つが違った場合は、変換先の Struct とのフィールドのマッピングを `json:"fieldname"` という形式で記述することで解決できます。

// JSON をマッピングするために
// キー名のタグ付をつけた Struct を定義
type Employee struct {
	Name  string `json:"emp_name"`
	Email string `json:"emp_email"`
	Dept  string `json:"dept"`
}

func main14() {
	// フィールド名が Struct の Filed 名と違う JSON も
	// json:"fieldname" の形でタグを付けてあるので
	// マッピングすることができる。
	var jsonString = []byte(`
	{
		"emp_name": "john",
		"emp_email" :"john@golang.com",
		"dept" :"HR"
	}`)

	var john Employee
	err := json.Unmarshal(jsonString, &john)
	if err != nil {
		fmt.Println("error:", err)
	}
	fmt.Printf("%+v\n", john) // {Name:john Email:john@golang.com Dept:HR}
}

Wrap Up

ということで、浅いところですが interface の使い方と基本的な使い方をまとめました。


今回参考にした資料には Go の interface がその下の C++ のレイヤでどういった構造になっているかなども載っているので、今度はその辺まで踏み込んでまとめたいと思います。


しかし、慣れないと Go での interface 設計は慣れないと難しいですが、これができないと今作ってるものがまともに組めないので、最も良いお手本であろう標準ライブラリを読みながら勉強していきたいところです。

Apache mod_spdy の X-Associated-Content で Server Push

intro

SPDY の Server Push については以前にも書きましたのでそちらをご参照下さい。

SPDY と WebSocket の基礎と SPDY の Push - Block Rockin’ Codes


上記記事ではサンプルの実装に node-spdy を用いましたが、更なる検証のために他の実装を調べました。


主な SPDY 実装で、 Server Push を実装しているサーバは、把握してる範囲では以下のような状況です。


今回は Server Push の検証のために、 Apache mod_spdy でこの機能を有効にする方法を調べました。

Apache mod_spdy

名前の通り、 Apache HTTP Server を SPDY 対応させるためのモジュールです。

mod-spdy - Apache SPDY module - Google Project Hosting


mod_spdy 自体のインストールはこちらを参照下さい。

Apache mod_spdy のインストール - Qiita [キータ]


そして、 mod_spdy で Server Push を行う方法は、基本的には本家の wiki である下記に書かれているんですが、具体的な方法の情報が少なかったので書いておきます。


https://code.google.com/p/mod-spdy/wiki/OptimizingForSpdy#Using_SPDY_server_push

X-Associated-Content

Server Push するリソースは X-Associated-Content というヘッダで指定します。
ヘッダのフォーマットは以下のようなものです。

X-Associated-Content: "https://www.example.com/styles/foo.css",
                      "/scripts/bar.js?q=4":2,
                      "https://www.example.com/images/baz.png": 5,
                      "https://www.example.com/generate_image.php?w=32&h=24"


mod_spdy は、この X-Associated-Content ヘッダを見つけると、そこに指定されたリソースをクライアントに push します。
フォーマットルールは以下です。

  • リソースは絶対パス指定、相対パスは非対応。ホストまでは省略可
  • 各々をダブルクオートで囲む
  • 複数ある場合はカンマ区切り
  • コロンの次に priority(0-7) を指定できる


注意点は以下です。

  • X-Associated-Content ヘッダは mod_spdy で処理されると削除される(クライアントには届かない)
  • priority が指定されなかった場合は、現実的かつ最小の値が自動で付与される(実装依存)
  • priority は値が小さい方が優先順位が高い


以上より、 mod_spdy が処理する前のどこかの処理で、レスポンスヘッダに X-Associated-Content ヘッダを付与してあげれば、mod_spdy が push を行ってくれます。
方法としては CGI などでプログラムで行うか、 mod_headers を用いることで実現できます。 

mod_headers

Apache のモジュールだけで完結し、デフォルトで入ってることから mod_headers を用いる方法が一番手軽です。
mod_headers はそのなの通りヘッダを制御するモジュールで、設定ファイルのほとんど全ての場所で、のヘッダを操作できます。


では、 /spdy/test/push.html に、 /jquery.js と /bootstarp.css を push する設定を作成してみます。

<Directory /spdy/test>
  Options Indexes
  allow from all
  <Files push.html>
    Header append x-associated-content "\"/jquery.js\", \"/bootstrap.css\""
  </Files>
</Directory>

ダブルクオートのエスケープに気をつければそのまま書くだけです。

成功したら、 Chrome などで jquery.js と bootstrap.css が Cache から読み込まれていることを確認して下さい。

Rails

とにかく mod_spdy に渡る前に X-Associated-Content ヘッダが付与できれば良いので、プログラムを介す場合はそのプログラムから API 経由でレスポンスをいじれば同様のことができます。
RailsApache で走らせている場合は以下のような感じ。(参考)

class SPDYController < ApplicationController
  def server_push
    response.headers["X-Associated-Content"] = '"/assets/jquery.js", "/assets/bootstrap.css"'
  end
end

CGI なども同様です。

outro

SPDY 単体でもそうですが、 Server Push も含めるとサイトの最適化に対する選択肢が増えます。


特に「ハイパフォーマンス Web サイト」に書かれているような手法のいくつかは、SPDY によって見直す余地が出てくる可能性があります。


現在その辺を少し検証しているので、まとめたらまた書きたいと思います。

LTSV の Stream Parser を Stream2 で書いてみた

Update

2013/02/12
JSON => JSON Object に(JSON string でないものは)修正

LTSV

LTSV が流行っていたんですが、完全に乗り遅れて Node も Go も実装は出てしまいました。
Node の方は sasaplus1 さんのものが こちら にあるんですが、パーサ関数のみで Stream ではなかったので、 Stream 実装を書いてみました。

ltsv-stream

Jxck/ltsv-stream · GitHub


npm でインストールできます。

npm install ltsv-stream

Stream2

Node での Stream の重要性は、このブログでも何度か書いてきたと思いますが、この Stream は Stream2 という新しい実装に変わりつつある (Stability: 2 - Unstable, v0.9 以降) ので、今回は Stream2 の勉強がてらそちらで書きました。

Stream2 のドキュメントは v0.9 以降の Node.js ドキュメント を見て下さい。


Stream2 の実装方法はこれまでの Stream と違い、用意されたクラスを継承して、親クラスの Abstract Method を小クラスで Override するという Template Method パターンに則って実装することになります。

Transform Stream

今回開発する Stream は LTSV 形式の文字列を読み込んで、各行を JSON Object にパースするようにします。
つまり、左から右にデータを変換しながら流すので Transform Stream (かつてこのブログでは Filter Stream といっていたもの) になります。


今回使ったライブラリは、 Transform Stream として以下のように使えます。
LTSV 形式のログが詰まったファイルが、流れるようにパースされて表示されている様が手に取るようにわかるでしょう。

var ltsv2json = require('ltsv-stream').ltsv2json
  , fs = require('fs');

var ltsv = new ltsv2json({stringify: true});
fs.createReadStream('ltsv-access.log').pipe(ltsv).pipe(process.stdout);


この例では表示のために stringify するオプションを使ってますが、
false の場合(デフォルトなので書かなくて良い)は JSON Object が emit されます。
(このオプションは ltsv-stream 独自のオプションで stream2 標準ではありません。)


本来このライブラリは、このあと何か LTSV 変換した JSON Object を使う Readable Stream を実装して、それを pipe() して使うような用途を想定しています。
stringify: true はネットワークに直接流すときや、デバッグ用に使って下さい。


Transform Stream の実装には二つのメソッドを Override します。
今回はこの二つが ltsv-stream でどう使われているかを解説します。

  • transform._transform(chunk, outputFn, callback)
  • transform._flush(outputFn, callback)
_transform

このメソッドが Transform の核となるメソッドです。
今回の場合は LTSV を一行ごとに JSON Object にする処理を書きます。


入力は文字列を想定していますが、一行づつ入力されることは想定していません。
例えば先の例のように fs.ReadableStream を pipe すると、入力はどこで途切れるかわからないので、今回は各行に分けてから LTSV のパース処理をする必要があります。


今回の場合は、簡単にするとこんな感じ。

LtsvStream.prototype._transform = function(chunk, output, cb) {
  if (chunk) {
    this.line += chunk;
    while (this.line.match(/\r?\n/)) {
      var record = RegExp.leftContext;
      this.line = RegExp.rightContext;
      record = this.parse(record); // ここで LTSV -> JSON Object
      output(record);
    }
  }
  return cb(null); // chunk の処理を終了
}


引数の chunk は、処理対象となる chunk です。今回の場合は LTSV の中途半端な文字列です。
もしかしたら LTSV 複数行分かもしれないし、一行に満たないかもしれません。
それを意識した上でパースして、生成された JSON Object を output() に渡します。
渡した分だけ次の Readable Stream に渡されます。


一つの chunk が処理し終わったら、 cb() を呼びます。
cb() にはその中で発生したエラーを渡すこともできます。
cb() を呼んだら次の chunk の処理に入り、以下繰り返しです。


_flush

_flush() は上位クラスで end イベントの前に呼ばれます。
Stream の終端で、最後にやらなければいけない作業がある場合、このメソッドを override することで実現できます。


今回の場合は、データを行ごとに処理するために chunk を一旦内部に保存しているので、 _transform で output() されなかったデータが残っている可能性があります。


今回は、この残ってるかもしれないデータを _flush() で処理します。
残ってるのは一行かそれに満たない LTSV レコードのはずなので、とりあえずまるっと parse() してみるだけです。
処理が終わったら(もしくは無かったら) cb() を呼び終わります。

ltsv2json.prototype._flush = function(output, cb) {
  if (this.line) {
    var record = this.line;
    try {
      record = this.parse(record);
    } catch (e) {
      return cb(e);
    }
    output(record);
  }
  cb(null);
};

Stream のテスト

今回の Transform Stream のテストはとりあえず以下のように書いてみました。
SPY (今思うと SPY とはちょっと違った。。まあいいや。) となる Writable Stream を作成し、それを pipe して中で渡されてくる値を確認し、 end イベントで終わらせる方針です。

SPY stream

チェックだけする Writable Stream です。

var Writable = require('stream').Writable;
var util = require('util');

function SPY(options) {
  Writable.call(this, options);
}
util.inherits(SPY, Writable);

// _write を上書きして、 chunk を調べる。
SPY.prototype._write = function(chunk, cb) {
  assert.deepEqual(chunk, expected);
  cb(null);
};
test

これを用いたテストは以下のようになります。
mocha の場合は非同期テストのための done() (このブログで紹介した next() と同等) を受け取れるので、 spy の finish イベントでそれを呼ぶことで、全ての chunk をテストできます。

var ltsv = new ltsv2json({stringify: false});
var spy = new SPY();
fs.createReadStream('test/test.log')
  .pipe(ltsv)
  .pipe(spy)
  .on('finish', function() {
    done(); // ここで終わる
  });

まとめ

ということで、 LTSV の波には見事乗り遅れましたが、ずっとペンディングしていた stream2 デビューを果たすことができました。


まだまだ Stream2 はドキュメントも少なく、実装も議論中なところがあるので手を出す敷居は高いかもしれません。
自分もまだまだ手探りなオプションや挙動が色々あります。
しかし、 Stream を制すものは Node を制すということで、自分ももう少し触って勘所を掴んでいきたいと思います。

GO の SPDY パッケージのアップグレード

追記

2013/02/13
パッチが本家に取り込まれました

https://code.google.com/p/go/source/detail?r=71409a1c89f0&repo=net

レビューでもの凄くお世話になった mikio 先生には頭が上がりません。
本当にありがとうございました。

intro

あけましておめでとうございます。
このお正月は、ずっと Go を触りながら、 Go の準標準モジュール? である SPDY パッケージのアップグレードをやっていました。


もともとは、 HTTP2 のサーバを書いてみようと思い、色々あって Go を選んだら、spdy パッケージがあるけどバージョンが古かったというのが始まりです。


SPDY の知識はある程度ありますが、 Go 自体はほとんどはじめてなので、こっから Go にも詳しくなれればというモチベーションです。

Go の SPDY パッケージ

もともと、 spdy や websocket などのパッケージは Go の標準パッケージだったんですが、Go の version 1 が出るタイミングでサブリポジトリに移されたようです。
(でも、管理自体は Golang Project がやっているみたい。)

http://golang.org/doc/go1.html#subrepo

なので、 import するときは下記のように指定します。

import "code.google.com/p/go.net/spdy"


このパッケージが提供するのは、 SPDY の各フレームに対応した型とその型への write/read のインタフェースの実装です。


で、この spdy パッケージは spdy/2 準拠になっています。これを spdy/3 にするというのが今回のスコープ。

成果

spdy/2 から spdy/3 への変化は色々ありますが、このパッケージに関わる大きな変化は以下です。

  • 圧縮に使う zlib の辞書データ
  • NOOP frame が消えた
  • CREDENTIAL frame が増えた
  • 各 frame の定義がちょこちょこ変わった


ということで、これらを実装したんですが CREDENTIAL frame 以外は実装が終わりました。


(CREDENTIAL frame はドキュメントの記述が他と整合して無く、内容も割と半端で、よくわからないところがあるし、しかも次のバージョンでは消えるかもしれない、ということでちょっと後回しになってる)


あとは、テストを直しながら明らかに SPDY の仕様変更に弱すぎるところがあるので、そこはペンディングにしたという点を除いては、一通り終わったかなと。
(ほとんどが、 Go のリーディングに費やされました。)

ソースはとりあえず下記に上げています。

https://github.com/Jxck/go-spdy

Contribute

最終的には本家に取り込んでもらえればと思うので、そのための手順を踏む必要があります。
手順は以下。

http://golang.org/doc/contribute.html


とりあえず ML でやってることの旨を伝えたら、Welcome とのことでした。
なので、あとはこの流儀に沿って本家に投げていこうと思います。


Git じゃなくて Mercurial 必須なんで、そっからですが、ちゃんとやれば Go のコミッタ達にレビューをしてもらえるということなので、十分得る物があるだろうと。

Go を触ってみて

今回の作業歴=Go 歴な訳ですが、個人的な感想を箇条書きで。

go fmt

go fmt が最初からあるのは良い。そして、そんなに違和感のあるフォーマットでもなかったし。
こういう流派で無駄な争いやらがおこるのってホント無駄なんで、公式が(長ったらしい説明でなく)ツールで提供してしまうので良いと思う。

ちなみに vimプラグインとかも提供されてて、コード書く環境自体に迷いは無く始められた。

go test

test がパッケージもコマンドもも最初から組み込まれているので、今一番生きのいい testing FW を探すというこの上なく無駄な時間が省けるのもいいですね。


test パッケージくらい最初からある言語は多いけど、ベンチやらコマンドやらも込みで、テスト関数やテストファイルの命名も決まっていてあとは書くだけっていうのがいいと思います。


ただ、 test にはちょっと癖があって、いわゆる assert.Equals な検証ではなく、 Go のコード自体に頼っておかしかったら落とす(Fail) という点が割と面白い。
比較に使うのは、 reflect と型アサーションが多いみたい。

  // 型アサーション
  v, ok := x.(T)
  if !ok {
    t.Fatal("Incorrect type:", T)
  }
  // 同値検証
  if !reflect.DeepEqual(expected, actual) {
    t.Fatal("got: ", actual, "\nwant: ", expected)
  }


あとテストを指定する -test.run という直感的じゃないオプションという点(というか go のオプションパーサのせいで、全体的にコマンドが UNIX ぽくない)と落ちたテストの関数名とかの出し方が -v だってことを覚えておけばまあいいかと。


細かいけど、型があるために Fixture 的なものを作る時にごまかしが効かないというか、 Ruby や JS ではテストしたいところに合わせた半端なオブジェクトをでっち上げたりできたけど、そういうことが出来ないなと思った。
別に、出来ないなと思っただけで GO が悪いとかそういうことは無い。単なる感想。
型があるために減るテストもあるし、Mock みたいな仕組みもあるらしいので、その辺を上手く使って行きたい。

go run

これがあるのが、最大の強みかなと思う。それが無ければちょっと便利な C とか C++ になるのかもしれないけど、ちょっと走らせるという点のコストの低さが非常に嬉しい。


同じコマンド体系で go build もできるので、すぐにバイナリもできるし。この辺がモダンだなぁと思う。
(複雑になってきたら、やっぱり make を書くのかな?その辺は知らない)


ただ、ちょっと試したい時にも、 import や変数の不使用チェックが必ず行われるのがちょっときつい。
カジュアルにコメントアウトしただけでは動かないことがあるので、 go run の時だけでもオプションが欲しいところだけど、
そういうことはしないで、 _ (アンダースコア) を使うようにお達しが出ている。

http://golang.org/doc/go_faq.html#unused_variables_and_imports

むしろ、run でオプションにしておいて build 時にはエラーにしちゃう方が、消し忘れなくていいと思うんだけどどうなんだろう。

GOPATH

最初よくわからずにハマった。全てはこれをきちっと設定してあげる事が重要。ここが規約になっていることが、 make みたいなツールが無くても go build/go install で色々解決できる助けになっているよう。(build のプロセスはまだブラックボックスでいる)


ちなみに、ここはプロジェクトをまたぐと設定しなおさないといけないのが面倒。
それをはかどらせるツールはある(Goのレポジトリ管理をするgenvというものを作りました - YAMAGUCHI::weblog) が、今回はプロジェクトが一個だし以下だけで済んだので入れていない。そのうち色々やるようになったら入れよう。

export GOPATH=$HOME/path-to-project/spdy-go
export PATH=$PATH:$GOPATH/bin
名前空間

パッケージを分けて、大文字で始まる関数がパブリックという覚えやすいルールなんだけど、慣れないと迷う(迷子になる)。


パッケージが複数のファイルから構成されていて、それを構成するファイルの命名規則は無いので、ある関数がどこで定義されているかは、 grep しないとわからない。ファイル自体がモジュールになるのはそれはそれで利点だったなとも思った。


今回のパッケージは、大きく 3 つにわかれていて、型(ベースとなるクラス的な)、 Read、 Write とあるけど、この分け方から外れる帯域定数とかは、grep しないとわからない。まあ、 Java とかも似た感じだけど eclipse があるから困らなかったのか。型と IDE の相性がここで出てくるかも。
あとは、慣れだと思う。

その他

思い出したら後で書く。



ということで、しばらくはこのパッケージのコントリビュートに取り組みたいと思います。