Block Rockin’ Codes

back with another one of those block rockin' codes

appengine-mapreduceを使ってみた。

appengine-mapreduce(python) を使ってみた。

噂のappengine-mapreduceを使ってみました。

丁度cronで回さないと行けない処理があったので、こちらを参考にさせていただきました。
#appengine MapReduceを使ってみた - スティルハウスの書庫

上記には概要やらメリット/デメリットもまとめられています。

とりあえず回してみる。

手順は本当に簡単です。
処理を実行したいデータストアを持つアプリケーションに

これだけです。

SDKをチェックアウト

アプリのディレクトリ以下で

svn checkout http://appengine-mapreduce.googlecode.com/svn/trunk/python/src/mapreduce

するだけ。

app.yamlに管理画面を追加

mapreduceにはjobの管理画面がついているので、そのURIを追加します。

handlers:
- url: /mapreduce(/.*)?
  script: mapreduce/main.py
  login: admin

処理を記述

現在appengine-mapreduceは、map処理にしか対応していないので、
mapper.pyを記述します。たぶん、このファイル名は何でもいいはず。

引数にentityをとるメソッドの形で、
そのentity一個一個に対する処理を記述します。

良くありそうな処理を書いてみます。

Entityを全部消すdeleter

エンティティを全部消したい。

def deleter(entity):
    entity.delete()

これだけ。

パラメータを増やしたい、減らしたい。

ExpandModel等で、パラメータを増やしたい。
これから作るエンティティにはデフォルトで値入れるけど、
過去のデータにもデフォルト値が入ってて欲しい。
みたいなのも簡単に出来ます。

例えば、新しくarchiveとstatusが増えたから、
過去のは全てFalseと0にしておきたい。
みたいな場合は

def process(entity):
    entity.archive = False
    entity.status = 0

    entity.put()

これだけ。

mapreduce.yaml記述

書いたmapperを登録します。

mapreduce:
- name: 管理画面に出てくる説明/名前
  mapper:
    input_reader: mapreduce.input_readers.DatastoreInputReader
    handler: 処理を書いた関数名
    params:
    - name: entity_kind
      default: 処理対象のmodel名

wikiに書かれているのはここまでなのですが、
実はまだパラメータはあります。
#appengine うそっ、私のMapReduce、遅すぎ? - スティルハウスの書庫でも言及されていましたが、
少なくとも後二つ。

  • processing_rate
  • shard_count

というパラメータがあります。

これはプロジェクトのsvnリポジトリの中にある、demoの中のmapreduce.yamlに書いてありました。
ソースはこちら

適当に値を入れて記述するとこんな感じ。

mapreduce:
- name: delete all entity
  mapper:
    handler: mapper.deleter
    input_reader: mapreduce.input_readers.DatastoreInputReader
    params:
    - name: entity_kind
      default: model.myentity
    - name: processing_rate
      default: 1000
    - name: shard_count
      default: 4
- name: set default parameter
  mapper:
    handler: mapper.process
    input_reader: mapreduce.input_readers.DatastoreInputReader
    params:
    - name: entity_kind
      default: model.myentity
    - name: processing_rate
      default: 1000
    - name: shard_count
      default: 16

パラメータ値は適当です。
色々やってみましたが、多ければ純粋に速くなるという訳ではやはり無いようですね。

実行

上記の準備が全て済んだら、デプロイして

http://xxxx.appspot.com/mapreduce/

にアクセスします。

先ほど登録した管理画面が出てきます。

ここで、登録した処理を選んで実行すれば走り出しあます。
経過時間等の細かい状態もここから確認できます。


管理コンソール

自分がやった画面は以下のような感じ。


Running Jobs

実行中、実行済みのjobの状態です。

Activityの欄が、shardsの実行数で、shard_countを変えれば増やせるんですが、
値を

  • 設定20のとき16
  • 設定200のとき128

だったのでどうも2^nをとるらしい。

Lanch Job

実行したい処理を選べます。
その下に

  • entity_kind
  • processing_rate
  • shard_count

を入力できるフォームがあるので、
mapreduce.yamlで記述した値を変えてわざわざデプロイし直さなくても、
この画面からある程度変更できるみたいです。

detail

実行したjobの詳細です。


この画面はshardを適当に200に設定して128個動いた時の図です。
Processed items per shardのグラフはかなり細かい。

ちなみに今回、5300件程度のデータに、default値4つ設定するという処理を
processing_rate と shard_count をちょこっといじって3回やってみました。

特に計画もない、ただのお試し処理ですが、なんとなく参考になれば。

Overview

    * success
    * Elapsed time 00:06:54
    * Start timeSat Jun 5 18:44:43 2010
    * entity_kind xxx
    * processing_rate 100
    * queue_name default #デフォルトは8

Counters

    * mapper_calls 4039(9.76/sec avg.)
Overview

    * success
    * Elapsed time 00:04:32
    * Start timeSat Jun 5 20:56:29 2010
    * entity_kind xxx
    * processing_rate 100000
    * queue_name default
    * shard_count 20 #実際動いたのは16

Counters

    * mapper_calls 4039(14.85/sec avg.)
Overview

    * success
    * Elapsed time 00:04:59
    * Start timeSat Jun 5 21:06:36 2010
    * entity_kind xxx
    * processing_rate 100000
    * queue_name default
    * shard_count 200 #実際動いたのは128

Counters

    * mapper_calls 4039(13.51/sec avg.)


processing_rateはまあ、上げると速くはなるようですね。
でもshardは上げればいいというものでは無さそうです。
自分としては、cronでやってた処理が簡単になった!
という感じなので、一分一秒はあまり気にならないんですよね。
なので、この辺の値の最適化はおいおい見て行きます。
専門家からの知見も持ち込まれて行くでしょう。
あと、感ですが、隠しパラメータはまだまだありそう。


今回僕がやったのはたかだか5000件なので、
id:kazunori_279さんの10万ほどではありませんが、
いずれにせよ、簡単に一括処理が出来て、数分で終わる。
これだけでもかなり便利です。

リポジトリ

リポジトリの中には、色々あります。

現在はtrunkだけですが、その中には

とあるので、javaの方も現時点でのソースは見れますね。

pythonの中には

  • demo
  • src
  • test

があります。

demoはwikiにある単なる処理ではなく、mapreduceのモジュールを、
webappの中で使うものになってます。
先ほどのパラメータはここで見つけました。

あと何よりもtestが丸々入ってるんですね。
使ってるのはpythonはunittest(javaはもちろんjunit)なので、
だれでもtestを動かせます。すばらしい。

その他

wikiには今回やったこと以外にも、色々書かれていますね。
今回は取り急ぎ上記の処理をやりたかったんで、ちゃんと読んではいませんが、
擬似的なreduceの方法も載ってます。
あとTransactionもサポートの予定みたいですね。

複雑な処理をする場合は、とりあえずその辺が揃ってからかなという印象ですが、
とりあえずは、今回みたいな簡単な一括処理で楽できそうです。