Block Rockin’ Codes

back with another one of those block rockin' codes

LTSV の Stream Parser を Stream2 で書いてみた

Update

2013/02/12
JSON => JSON Object に(JSON string でないものは)修正

LTSV

LTSV が流行っていたんですが、完全に乗り遅れて Node も Go も実装は出てしまいました。
Node の方は sasaplus1 さんのものが こちら にあるんですが、パーサ関数のみで Stream ではなかったので、 Stream 実装を書いてみました。

ltsv-stream

Jxck/ltsv-stream · GitHub


npm でインストールできます。

npm install ltsv-stream

Stream2

Node での Stream の重要性は、このブログでも何度か書いてきたと思いますが、この Stream は Stream2 という新しい実装に変わりつつある (Stability: 2 - Unstable, v0.9 以降) ので、今回は Stream2 の勉強がてらそちらで書きました。

Stream2 のドキュメントは v0.9 以降の Node.js ドキュメント を見て下さい。


Stream2 の実装方法はこれまでの Stream と違い、用意されたクラスを継承して、親クラスの Abstract Method を小クラスで Override するという Template Method パターンに則って実装することになります。

Transform Stream

今回開発する Stream は LTSV 形式の文字列を読み込んで、各行を JSON Object にパースするようにします。
つまり、左から右にデータを変換しながら流すので Transform Stream (かつてこのブログでは Filter Stream といっていたもの) になります。


今回使ったライブラリは、 Transform Stream として以下のように使えます。
LTSV 形式のログが詰まったファイルが、流れるようにパースされて表示されている様が手に取るようにわかるでしょう。

var ltsv2json = require('ltsv-stream').ltsv2json
  , fs = require('fs');

var ltsv = new ltsv2json({stringify: true});
fs.createReadStream('ltsv-access.log').pipe(ltsv).pipe(process.stdout);


この例では表示のために stringify するオプションを使ってますが、
false の場合(デフォルトなので書かなくて良い)は JSON Object が emit されます。
(このオプションは ltsv-stream 独自のオプションで stream2 標準ではありません。)


本来このライブラリは、このあと何か LTSV 変換した JSON Object を使う Readable Stream を実装して、それを pipe() して使うような用途を想定しています。
stringify: true はネットワークに直接流すときや、デバッグ用に使って下さい。


Transform Stream の実装には二つのメソッドを Override します。
今回はこの二つが ltsv-stream でどう使われているかを解説します。

  • transform._transform(chunk, outputFn, callback)
  • transform._flush(outputFn, callback)
_transform

このメソッドが Transform の核となるメソッドです。
今回の場合は LTSV を一行ごとに JSON Object にする処理を書きます。


入力は文字列を想定していますが、一行づつ入力されることは想定していません。
例えば先の例のように fs.ReadableStream を pipe すると、入力はどこで途切れるかわからないので、今回は各行に分けてから LTSV のパース処理をする必要があります。


今回の場合は、簡単にするとこんな感じ。

LtsvStream.prototype._transform = function(chunk, output, cb) {
  if (chunk) {
    this.line += chunk;
    while (this.line.match(/\r?\n/)) {
      var record = RegExp.leftContext;
      this.line = RegExp.rightContext;
      record = this.parse(record); // ここで LTSV -> JSON Object
      output(record);
    }
  }
  return cb(null); // chunk の処理を終了
}


引数の chunk は、処理対象となる chunk です。今回の場合は LTSV の中途半端な文字列です。
もしかしたら LTSV 複数行分かもしれないし、一行に満たないかもしれません。
それを意識した上でパースして、生成された JSON Object を output() に渡します。
渡した分だけ次の Readable Stream に渡されます。


一つの chunk が処理し終わったら、 cb() を呼びます。
cb() にはその中で発生したエラーを渡すこともできます。
cb() を呼んだら次の chunk の処理に入り、以下繰り返しです。


_flush

_flush() は上位クラスで end イベントの前に呼ばれます。
Stream の終端で、最後にやらなければいけない作業がある場合、このメソッドを override することで実現できます。


今回の場合は、データを行ごとに処理するために chunk を一旦内部に保存しているので、 _transform で output() されなかったデータが残っている可能性があります。


今回は、この残ってるかもしれないデータを _flush() で処理します。
残ってるのは一行かそれに満たない LTSV レコードのはずなので、とりあえずまるっと parse() してみるだけです。
処理が終わったら(もしくは無かったら) cb() を呼び終わります。

ltsv2json.prototype._flush = function(output, cb) {
  if (this.line) {
    var record = this.line;
    try {
      record = this.parse(record);
    } catch (e) {
      return cb(e);
    }
    output(record);
  }
  cb(null);
};

Stream のテスト

今回の Transform Stream のテストはとりあえず以下のように書いてみました。
SPY (今思うと SPY とはちょっと違った。。まあいいや。) となる Writable Stream を作成し、それを pipe して中で渡されてくる値を確認し、 end イベントで終わらせる方針です。

SPY stream

チェックだけする Writable Stream です。

var Writable = require('stream').Writable;
var util = require('util');

function SPY(options) {
  Writable.call(this, options);
}
util.inherits(SPY, Writable);

// _write を上書きして、 chunk を調べる。
SPY.prototype._write = function(chunk, cb) {
  assert.deepEqual(chunk, expected);
  cb(null);
};
test

これを用いたテストは以下のようになります。
mocha の場合は非同期テストのための done() (このブログで紹介した next() と同等) を受け取れるので、 spy の finish イベントでそれを呼ぶことで、全ての chunk をテストできます。

var ltsv = new ltsv2json({stringify: false});
var spy = new SPY();
fs.createReadStream('test/test.log')
  .pipe(ltsv)
  .pipe(spy)
  .on('finish', function() {
    done(); // ここで終わる
  });

まとめ

ということで、 LTSV の波には見事乗り遅れましたが、ずっとペンディングしていた stream2 デビューを果たすことができました。


まだまだ Stream2 はドキュメントも少なく、実装も議論中なところがあるので手を出す敷居は高いかもしれません。
自分もまだまだ手探りなオプションや挙動が色々あります。
しかし、 Stream を制すものは Node を制すということで、自分ももう少し触って勘所を掴んでいきたいと思います。