Block Rockin’ Codes

back with another one of those block rockin' codes

Node.js の Stream API で「データの流れ」を扱う方法

追記

11/12/6
少し誤字脱字を修正、加筆
11/12/7
koichik さんにコメントで頂いたリンクと、その内容について追記
11/12/7
edvakf さんに頂いた指摘を修正

本文

この記事は、JavaScript Advent Calendar 2011 (Node.js/WebSocketsコース) の 4 日目の記事です。


Node.js には Stream という API があります。
Stream はとても重要な技術で、

「Stream を制するものは、 Node.js を制す」

と言っても過言ではありません。

実際、 Stream は Node.js が得意とする I/O の部分を使いこなすために、
押さえておくべき技術なので、今回はこの Stream について紹介したいと思います。

参考
Jxck's OutPut - Node.js の Stream

I/O のおさらい+α

まず I/O について簡単におさらいします。
例えば、ファイルを読む場合、

同期 I/O なプラットフォームでは、大抵以下のようになるでしょう。
(このやり方しかできないというわけではありません、比較として)

data = File.read('path/to/file')
print data

一方非同期 I/O の例として、 Node.js だとコールバックを渡して以下のようになります。

readFile('path/to/file', function(data) {
  console.log(data);
});

非同期なので、I/O の終了を待っている間は、別の処理を行うことができます。
しかし、上のいずれも結果的には「読み込んだファイルの中身全部」をまとめて扱っていることがわかるでしょう。


Node.js ではこの I/O 結果に対する処理を、「全部まとめて」ではなく「破片を読み込むごと」に扱うことができます。
具体的には以下のようになります。

var readableStream = fs.createReadStream('path/to/file', {encoding: 'utf-8', bufferSize: 1});
readableStream.on('data', function(data) {
  console.log(data);
});
readableStream.on('end', function() {
  console.log('end');
});

data イベントが発生するごとに、データの破片がコールバックに渡されます。
この破片は chunk と呼ばれます。

この chunk は、例えば読む対象がテキストファイルだからといって、「一行ごと」になったりはしません。
基本的にはサイズを想定せず、行単位で扱うなら、一旦ためてから改行コードを見るなどの処理が、コールバックの中で必要になります。

ちょっと面倒に思えるかもしれませんが、これが Stream 。上の例は、 ReadableStream の例です。

Stream とは

Stream は「データの流れ」を抽象化するためのインタフェース。というような位置づけになります。
EventEmitter を継承し、読み込み用の ReadableStream と、書き込み用の WritableStream があります。

ちなみに process.stdin(標準入力), process.stdout(標準出力) もそれぞれ、 ReadableStream と WritableStream です。


インタフェースなので、 Stream が実装すべきイベント、メソッドなどは決められており、マニュアルに記載されています。


http://nodejs.jp/nodejs.org_ja/docs/v0.6/api/all.html#streams


今回使う、一部の最低限重要と思うものを挙げます。
説明は砕いて書くので、正確な定義はマニュアルを参照してください。

Event of ReadableStream
'data'
読み込んだデータの発生
'end'
読み/書き込みデータの終了
Method of ReadableStream
resume
'data'イベントの到着を開始/再開
pause
'data'イベントの到着を中断
pipe
ReadableStreamの結果をWritableStreamに繋ぐ
Event of WritableStream
'drain'
書き込みが再開可能
'pipe'
パイプされたことがわかる
Method of WritableStream
write
書き込む
end
溜まってるデータも全部書き出しておわり

pipe/util.pump

ある ReadableStream から読み込んだデータを、そのまま WritableStream に渡す例を考えます。
例えば、大きなファイルを読み込み、HTTP のレスポンスとして返すような場合です。
この場合は、ReadableStream でファイルの中身を chunk で読み込み、その chunk を WritableStream で Socket に書き込む感じです。


しかしこの例のように、デバイスの I/O 速度によって ReadableStream からの読み込みが、 WritableStream の書き込みよりも速い場合があります。
すると調節のために、書き込みが間に合わなかった場合に、読み込みを一旦止めるといった処理が必要になります。


この場合、

  1. バッファがいっぱいになった時、 WritableStream.write() が false を返す。
  2. ReadableStream.pause() で読み出しを一旦止めることができる。
  3. その後再び書き込みできる状態になったら、 WritableStream で drain イベントが起こる。
  4. ReadableStream.resume() は読み込みを(開始ではなく)「再開する」メソッドである。

ということをふまえて、以下のような感じで調整する必要が出ます。

// 読み込み開始(正確には止まってるから再開する)
readableStream.resume();
readableStream.on('data', function(data) {
  if (writableStream.write(data) === false) { // 書き込みがいっぱいいっぱいです
    // 一旦止める
    readableStream.pause();
  }
});
// 
writableStream.on('drain', function() { // 書き込み再開できます
  // 書き込み再開
  readableStream.resume();
});

こんな感じ。


しかしこうした処理は、 Stream 同士を繋いでデータを「流す」ような Node.js のアプリでは頻出します。
そこで、 ReadableStream には pipe() というメソッドがあり、引数に WritableStream を渡せば、上記のような調整を自動で行い、データの流れを上手く調整してくれます。


util.pump() というメソッドも同等なことをしてくれますが、これは将来廃止される予定なので、使わない方が良いでしょう。
ただし実装を読む分には参考になるので、紹介します。


Stream の実装

データを上手く「流れ」に乗せるために、何らかのデータ生成源を、Stream で抽象化したい場合があります。
Stream は先ほど言ったようにインタフェースなので、必要なメソッド等を実装すればよいことになります。

WritableStream

まず簡単な WritableStream を実装してみましょう。
ここでは、 process.stdin が ReadableStream であることを利用し、そこからデータを受け取って、まとめて出力する、簡単な MyStream を実装してみます。


色々はしょってやると以下の通り、 stream.Stream を util.inherits で継承して、必要なメソッドを実装するだけです。とりあえず write と end、あと writable プロパティあたりがあれば動きます。

var stream = require('stream')
  , util = require('util')
  , log = console.log.bind(console)
  ;

// 本来は 'drain','error','close','pipe' イベントが必要
function MyStream() {
  this.writable = true;
  this.buf = [];
}

// 継承、詳細は util.inherits を参照
util.inherits(MyStream, stream.Stream);

MyStream.prototype.write = function(data) {
  var data = data.toString().trim();
  log('write:', data);
  this.buf.push(data);
  return true;
};

MyStream.prototype.end = function(data) {
  log('end:', data);
  if (data) this.write(data);
  this.writable = false;
  log('\nresult:', this.buf.join(''));
};

MyStream.prototype.destroy = function() {};
MyStream.prototype.destroySoon = function() {};

module.exports = MyStream;

if (require.main === module) {
  var mystream = new MyStream();

  // 標準入力をパイプする
  process.stdin.pipe(mystream);
  // 読み込み開始
  process.stdin.resume();
}

実行してみます。実行したら、キーボードから文字を入力し、最後に ctl-D で止めます。

$ node mystream.js
a
write: a
b
write: b
c
write: c 
end: undefined // ctl-D

result: abc

ReadableStream

ここでキレイに ReadableStream の例が書ければ良いのですが、なんかあまりいいものがうかばず。。
とりあえず、カウントを走らせてそれを垂れ流す、 TimerStream なるものをでっち上げてみました。

var stream = require('stream')
  , util = require('util')
  , log = console.log.bind(console)
  ;

// 本来は 'data', 'end', 'error', 'close' イベントが必要
function TimerStream() {
  this.readable = true;
  this.t = 0;
  this.timer = null;
  this.piped = false;
}

// 継承、詳細は util.inherits を参照
util.inherits(TimerStream, stream.Stream);

TimerStream.prototype.resume = function() {
  this.timer = setInterval(function() {
    this.t++;
    if (this.t > 4) {
      return this.emit('end');
    }
    this.emit('data', this.t.toString());
  }.bind(this), 1000);
};

TimerStream.prototype.pause = function() {
  clearInterval(this.timer);
};

TimerStream.prototype.pipe = function(dest) {
  this.piped = true;

  // ここでは stream.Stream.prototype.pipe.apply(this, arguments); もok
  this.on('data', function(data) {
    dest.write(data);
  });
};

TimerStream.prototype.setEncoding = function(encoding) {};
TimerStream.prototype.destroy = function() {};
TimerStream.prototype.destroySoon = function() {};

module.exports = TimerStream;

if (require.main === module) {
  var timerStream = new TimerStream();
  timerStream.pipe(process.stdout);
  timerStream.resume();
}

実行すると時間が出力されます。終了は ctl-C

$ node timerstream.js
1234^C

デバッグ

途中で以下のような謎のエラーが出たんですが。。

$ node timerstream.js
Assertion failed: (Buffer::HasInstance(args[0])), function Write, file ../src/stream_wrap.cc, line 289.
zsh: abort      node 02.timerstream.js

stream_wrap.cc を読んだら、誤って buffer が渡っていたようです。
TimeStream.prototype.resume の中の emit の引数。

  - this.emit('data', this.t);
  + this.emit('data', this.t.toString());

このエラーは分かりにくすぎるだろさすがに。。

pipe でつなぐ

ためしに二つをつなぐならこんな感じ。
(end 周りが半端ですが)

var TimerStream = require('./timerstream')
  , MyStream = require('./mystream')
  , ts = new TimerStream()
  , ms = new MyStream()
  ;

// パイプで繋ぐ
ts.pipe(ms);
// 読み込みを開始
ts.resume();

実行は ctl-C で止めます。

$ node pipesample.js
write: 1
write: 2
write: 3

Stream の使い道

データの流れを上手いこと Stream に抽象化できれば、それを pipe で繋ぐだけでやり取りができます。
また、 writable かつ readable な Stream を実装することもできます。


たとえば、あるファイル A.js 内でデータが生成され、それを他のファイル B.js に渡したい場合、次々発生するデータは module.exports だけでは対応できません。
この場合 B.js から A.js に writable かつ readable な Stream を渡し(逆も可)、 A.js 内では発生したデータを write() でどんどん書き込む。B.js はそのストリーム経由でデータを受け取るといったことができます。もちろん B.js ではそれをまた pipe で繋ぐ、といったこともできます。
([追記] ここで言ってるのは、最後に紹介する Filter のパターンです。)


多分こんな感じ。(一度やろうと思ったけど、結局やらなかったので未検証)

// A.js (書き込む側)
var stream = require('B');
stream.write(data);
// もちろんここで pipe してもいい
// B.js (書き込ませたい側)
var stream = /*snip*/;
module.exports = stream;
stream.on('data', /*snip*/);


また、 File や Socket 系の主な API は、すでに Stream になってることが多いので、そのストリームをそのままやり取りすればいいです。
こうすると、普段何気なく使っていた fs や net、 http モジュールの取り回しの幅が広がるでしょう。
それ以外で生成したデータを Stream にしたい場合は、上で紹介したように自分で実装すれば良いでしょう。

Stream の今後

piscisaureus が書いた Node v0.8 roadmap には、 StreamAPI についての改善が上がっていました。
主に自分で Stream オブジェクトが定義しやすくなるようにするようです。担当は Isaac。

https://gist.github.com/1346745


しかし、 Google Group に Ryan があげたロードマップには、これがなくなっています。

https://groups.google.com/forum/#!topic/nodejs/eVBOYiI_O_A/discussion


つまり、内部的に話は上がっているけど、他が優先になってるのかな。

ちなみに TJ のインタビューでも、最後の方で Stream API について触れられています。

原文
http://www.infoq.com/articles/nodejs-in-action
翻訳
http://d.hatena.ne.jp/vwxyz/20111014/1318568665

また、コメントで頂いたリンクについては、最後に別途紹介させていただきます。

Stream ベースのアプリケーション

また、アプリケーション単位でもサーバを ReadableStream 、クライアントを WritableStream とみたてて(もちろん逆もあり)、今までのデータが「渡される」感じよりも、「流れている」感じを出して行くことが、リアルタイム Web な流れの中で一つのポイントになると考えます。


この時、 WebSocket といった通信方法以外に、この考えを取り込んだ Web アプリケーションのアーキテクチャの研究が必要です。
そこでは、 Bi-Side JavaScript (両方 JS で書ける)ということが、より有利に働く場面かも知れませんね。


もともと SocketStreamFlatIron がそうしたストリームベースな部分を意識して作られてはいるんですが、正直これらもまだ完成系とは言えず、この分野はまだまだ研究の余地が多分に残っていると思います。


Stream ベースなリアルタイムアプリ。夢は広がりますね。

補足

  • サンプルは API 通りに網羅した実装にはなってません。
  • 同じく、エラー処理も、まともな終了処理もしてません。
  • 今回は buffer の話は省略しました。
  • 本当は自作 Stream のコンストラクタ関数内で stream.Strem のコンストラクタ読んだ方がいい気がしなくもない。(今は影響無いはずだけど。)
  • Stream の独自実装については、この辺が参考になります。https://github.com/mikeal/morestreams
  • 今回使ったコードはこちらにあげてます。https://gist.github.com/1426284

追記

コメントで「Stream の今後に」と頂いたリンクですが、ここで紹介します。

Streams2

https://github.com/joyent/node/pull/1681

まずこちらは、 mikeal の pull request です。
内容は以下の三つです。

  • Make stream.Stream a read/write stream with proxy methods by default.
  • Make readable streams in lib use stream.ReadStream.
  • Add stream.createFilter.

要するに上述した「Stream を使いやすく」の具体的な提案です。
「master のテストが落ちてるから merge はおいといて、先に API について話そう」
ということで 2011/09 に始まりました。

途中、 isaacs, ry も参戦して、
「今はとにかく API の議論が大事だ。 node の今後を左右する」という勢いで、
慎重な議論が行われてますね。コンスタントに続いて 11 月頭に一旦落ち着いたようですが、
3日前にまだ投稿があるので、まだ続くのかもしれませんね。
それだけ、大事だということを物語っているととれると思います。

Spec for streams

https://gist.github.com/1241393

時期的には上の議論の初期に izaacs によって書かれたものです。
コードは無く API ベースの提案です。
メソッドをオーバーライドして独自 Stream を実装できるような、Stream の基本クラスを用意するさいの、仕様のドラフトです。
ここから実装される Stream として、今回紹介した ReadableStream と WritableStream 以外にあと二つ書かれています。
以下の二つはいずれも ReadableStream かつ WritableStream ですが、動きが少し違います。

Filter Streams
write() すると self.emit('data', ...) する感じ。
Duplex Streams
write() と self.emit('data') が独立して行われる感じ。


Filter は、write() がそのまま emit('data') するので、以下のように pipe の途中に挟む事ができます。
これにより RS から WS に渡るまでの間に色々な処理(暗号化, Gzip etc)を挟む事ができます。

RS.pipe(Filter).pipe(WS);

Duplex は、 write() でデータを受け取る事も、 emit('data') することもできるけど、二つには関連は無く独立しています。
代表例が Socket で、つまり相手に「データを送る(write)」も「データを受け取る(emit)」もできます

エコーサーバならこうなります。

net.createServer(function (socket) {
  socket.pipe(socket);
});

Filter を組み合わせると、例えば リクエストに対して何かしら処理をしてレスポンスを返すサーバは
(リクエストが全部揃ってなくてもレスポンスが始められるる感じならたぶん)
以下みたいにも書けるってことになるのかな。(未検証)

net.createServer(function (socket) {
  socket.pipe(requestHandleFilter).pipe(socket);
});


そして、これらに考えられる問題として、 pipe() の途中でエラーが出た時の処理についてが上がっています。

この辺が今後 Stream を扱い、 API を洗練していく過程で大事になってくるんだろうと思います。

参考

関連の issue など。


今自分が追えているのはこのくらいまで。
Stream はまだまだ奥が深いので、後は使いながら考えて行きたいと思います。


引き続き、指摘、質問、コメント歓迎です。