読者です 読者をやめる 読者になる 読者になる

Block Rockin’ Codes

back with another one of those block rockin' codes

Stream API がブラウザにやってくる

stream

Intro

今日は、フロントのプログラミングスタイルに、にまた一つ大きな変化をもたらすであろう Stream という API についてです。

この仕様は現時点でまだ策定中であるため、 API は変更される恐れがある点にご注意ください。

Stream API

以前 「Node.js の Stream API で「データの流れ」を扱う方法」 という記事を書きましたが、簡単に言うとあれがブラウザにもやってくるという話です。

非同期処理おさらい

もう何度も書いた話なので駆け足で。

JS はシングルスレッドでイベント駆動な世界なので、何をするにも非同期であり、コールバックを登録することで完了した結果を受け取る API が基本です。 これは、ブラウザの DOM の API でも、 Node.js でも共通しています。

概念を疑似コードで書くと以下のような感じです。

console.log('1');

file.open('path', (err, data) => {
  // 非同期なデータの読み出し終わってから実行される
  console.log(data);
});

console.log('2');

// 1 -> data -> 2

実行順序についてはもういいでしょう。

そして、最近ではこれを抽象化(あるいは部品化)する仕組みとして、 Promise の導入が進んでいます。 ざっくり言うとこんな感じ。

var p = file.open('path');

p.then((data) => {
  console.log(data);
}).catch((err) => {
  console.error(err);
});

非同期に取得/生成される結果自体をオブジェクトにし、コールバックの適応方法を切り離したことにより、 例えば部品化やエラー処理の集約ができるようになりました。 API が Promise を返さないものは、自分で Promsie オブジェクトにくるんで、同様のインタフェースに寄せることができます。

統一したインタフェース(thenable)に則っていることにより、他の部品との組み合わせも以下のようにできます。

promise
 .then(b)
 .then(c)
 .catch(console.error.bind(console));

しかし、どちらもコールバックは、処理の完了後に一回実行される実装で考えるのが普通です。 つまり上の例はいずれも、「ファイルを読み終わったら、その結果をまとめ、一回だけコールバックを実行する」となります。

連続したイベントを表現する

上記の例は、一度だけその時のファイル内容をまるっと表示しますが、大きなファイルを読む場合は、 読み出せたところから表示できる方が適した場面があります。

そうした連続したイベントを扱うのが Stream API です。

Node では Stream が以前からあり、今 v1, v2, v3 と来てちょっと移行段階なので、 これもざっくり概念疑似コードで書くと以下のようなイメージ。

filestream = file.createReadStream(path);

filestream.on('data', (data) => {
  console.log(data);
}).on('error', (err) => {
  console.error(err);
});

例えば Node.js では、 Stream の実装が EventEmitter になっており、ファイルを開くとその中身を Chunk ごとに読み出して、その結果を引数にして逐一 data というイベントを発火します。

これにより、 Promise の例とは違い大きなファイルも読み出した端から表示されます。

すばらしい点は、 Stream も他の Stream と組み合わせられるところです。 Node.js では pipe() というメソッドに stream を渡すと、それらを連結されることができます。これは Unix のパイプ (|) と同じです。

Node.js の Stream には 4 つの種類があります。

  • ReadableStream: そこからデータが読み出せる(データ生成源のラッパー: file, socket, stdin etc)
  • WritableStream: そこにデータを書き込める(データ入力先のラッパー: file, socket, stdout etc)
  • TransferStream: 左から右に処理しながら流す(データの改変などを行う: 圧縮, 暗号化, パース etc)
  • DuplexStream: HTTP サーバのようにデータの送受信を両方になう: http server etc

Transfer, Duplex は Readable と Writable の組み合わせでもあります。

こんな感じ。

// readable.pipe(transform).pipe(writable);
getStream.pipe(transferStream).pipe(fileWritableStream);

データがイベントループの導きによって、流れるように処理されて行く様が見て取れますね。

「Stream を制す者は Node.js を制す」

は決して大げさではないのです。

フロントにも Stream が欲しいよね

せっかくサーバサイドで流れるように処理されたデータを、せっかく WebSocket のようなステートフルな接続で送っても、 送った先がコールバックや Promise の世界に戻ってしまうと、片手落ちです。

「フロントでも Stream を使いたい。」

こうして人類はブラウザでも動く Stream の実装を吐いて捨てるほど作ってきました。 ちなみに俺は「全く同じものを!」ということで、 Node.js の Stream をコツコツ移植しています。 (その副作用で移植した Assert の方が使われてるというのは、また別のお話)

そして、同様に「この仕組みを標準にしよう」という話が進んでおり、もうすぐフロントも Stream ベースな時代が来る!というのが今日のお話。

ちなみにこままで駆け足で説明してきたことを、 Node.js のコミッタが熱く語る話はこちら。 mozaic.fm #10 node.js sideshow

WHATWG Stream API

本題です。ずばり WHATWG がメンテするドラフトに Stream API が追加されました!!

現時点での仕様はこちらです。

Streams Living Standard: https://streams.spec.whatwg.org/

この仕様では、 readable, writable, transform の三種類が定義されています。(duplex はありません)

これらの仕様は fetch, service worker, source extension, video, audio などで、流れるデータを表現するのに使われていくことになります。

例えば、 video や audio のデータを readable stream として受け取り、それを transform stream に繋ぐ(pipe)ことで、エフェクトをかけたりすることができるし、 圧縮されたデータを xhr で取得し、その readable stream を、解凍する transform stream に通してから、ファイルに書き込むための writable stream に渡すなんて使い方が想定されます。

完全に Node.js と同じですね。

構成図

全体の構成はこんな感じになります。 詳細は以降順次解説していきます。

f:id:Jxck:20141101161522p:plain

実装は ES6 の class ベースになっており、継承による拡張についても仕様に言及されています。

ReadableStream

ReadableStream は以下のような E@6 のクラスとして定義されています。 実際にデータが生成される生成源は underlying source と呼ばれます。 そこから取り出したデータを chunk ごとに内部に管理されている queue に追加して行きます。

class ReadableStream {
  constructor({
    start = (enqueue, close, error) => {},
    pull = (enqueue, close, error) => {},
    cancel = (reason) => {},
    strategy = %DefaultReadableStreamStrategy%
  } = {})

  get closed()
  get state()

  cancel(reason)
  pipeThrough({ writable, readable }, options)
  pipeTo(dest, { preventClose, preventAbort, preventCancel } = {})
  read()
  wait()
}

動く環境はまだ無いですが、以下のようになる予定です。 WebSocket をラップした ReadableStream です。

function makeReadableWebSocketStream(url, protocols) {
  const ws = new WebSocket(url, protocols);
  ws.binaryType = "arraybuffer";

  return new ReadableStream({
    start(enqueue, close, error) {
      ws.onmessage = event => enqueue(event.data);
      ws.onend = close;
      ws.onerror = error;
    },

    cancel() {
      ws.close();
    }
  });
}

var webSocketStream = makeReadableWebSocketStream("http://example.com", 80);

webSocketStream.pipeTo(writableStream)
  .then(() => console.log("All data successfully written!"))
  .catch(e => console.error("Something went wrong!", e));

WritableStream

同様に WritableStream の定義と実装例です。 write() で渡されてきた chunk を処理します。

class WritableStream {
  constructor({
    start = (error) => {},
    write = (chunk) => {},
    close = () => {},
    abort = (reason) => close(),
    strategy = %DefaultWritableStreamStrategy%
  } = {})

  get closed()
  get state()

  abort(reason)
  close()
  wait()
  write(chunk)
}

start() の部分は Promise を許容する仕様になているので、解決するように橋渡しします。

function makeWritableWebSocketStream(url, protocols) {
  const ws = new WebSocket(url, protocols);

  return new WritableStream({
    start(error) {
      ws.onerror = error;
      return new Promise(resolve => ws.onopen = resolve);
    },

    write(chunk) {
      ws.send(chunk);
    },

    close() {
      return new Promise((resolve, reject) => {
        ws.onclose = resolve;
        ws.close();
      });
    }
  });
}

var webSocketStream = makeWritableWebSocketStream("http://example.com", 80);

readableStream.pipeTo(webSocketStream)
  .then(() => console.log("All data successfully written!"))
  .catch(e => console.error("Something went wrong!", e));

Queuing Strategies と Back Pressure

TODO: WIP

Stream はデータの流れを表現し、 pipeTo でそれらを組み合わせる訳ですが、 source や sink もしくは transfer が行う処理などによっては、データの流れる早さにギャップが生じることがあります。

例えば、ローカルのファイルからデータを読み出す ReadableStream から、それをネットワークに流す WritableStream と繋ぐような場合は、 前者のデータ生成が、後者のデータ処理よりも早くなる可能性があります。

すると、ReadableStream の内部 Queue に処理待ちの chunk が溜まり続け、 そのままでは溢れてしまうため、この場合はデータ生成源であるファイルからの読み出しを止める必要があります。

こうした処理は Back Pressure と呼ばれ、各 Stream の中でそれぞれの Queue は適切に管理する必要があります。

各クラスにある strategy というプロパティは、この内部 Queue の管理戦略です。

実装としてはこんな感じらしい。

  • ByteLengthQueuingStrategy: Queue を byte サイズをベースに管理
  • CountQueuingStrategy: Queue を chunk の数をベースに管理

両方定義としてはこんな感じ。 コンストラクタで HWM (HighWaterMark) を決める。

class Strategy {
  constructor({ highWaterMark })
  shouldApplyBackpressure(queueSize)
  size(chunk)
}

Outro

Node.js の Stream API でもちょくちょく話題になる、 Backpressure の話や、ソースが Pull/Push ベースなのかどうかに関する話題も、きちんと議論されているようです。

これでサーバからクライアントまで、全て奇麗にデータが流れる Stream の列が出来ると、非常に奇麗にリアルタイムな表現が実装できるようになりそうでうs。

すでに Chromium への実装が始まっており、 polyfill の実装 も公開されています(ES6 ですよ時代は)。

間違いでした。 domenic と dominic さんです。