appengine-mapreduceを使ってみた。
appengine-mapreduce(python) を使ってみた。
噂のappengine-mapreduceを使ってみました。
丁度cronで回さないと行けない処理があったので、こちらを参考にさせていただきました。
#appengine MapReduceを使ってみた - スティルハウスの書庫
上記には概要やらメリット/デメリットもまとめられています。
SDKをチェックアウト
アプリのディレクトリ以下で
svn checkout http://appengine-mapreduce.googlecode.com/svn/trunk/python/src/mapreduce
するだけ。
app.yamlに管理画面を追加
mapreduceにはjobの管理画面がついているので、そのURIを追加します。
handlers: - url: /mapreduce(/.*)? script: mapreduce/main.py login: admin
処理を記述
現在appengine-mapreduceは、map処理にしか対応していないので、
mapper.pyを記述します。たぶん、このファイル名は何でもいいはず。
引数にentityをとるメソッドの形で、
そのentity一個一個に対する処理を記述します。
良くありそうな処理を書いてみます。
Entityを全部消すdeleter
エンティティを全部消したい。
def deleter(entity): entity.delete()
これだけ。
パラメータを増やしたい、減らしたい。
ExpandModel等で、パラメータを増やしたい。
これから作るエンティティにはデフォルトで値入れるけど、
過去のデータにもデフォルト値が入ってて欲しい。
みたいなのも簡単に出来ます。
例えば、新しくarchiveとstatusが増えたから、
過去のは全てFalseと0にしておきたい。
みたいな場合は
def process(entity): entity.archive = False entity.status = 0 entity.put()
これだけ。
mapreduce.yaml記述
書いたmapperを登録します。
mapreduce: - name: 管理画面に出てくる説明/名前 mapper: input_reader: mapreduce.input_readers.DatastoreInputReader handler: 処理を書いた関数名 params: - name: entity_kind default: 処理対象のmodel名
wikiに書かれているのはここまでなのですが、
実はまだパラメータはあります。
#appengine うそっ、私のMapReduce、遅すぎ? - スティルハウスの書庫でも言及されていましたが、
少なくとも後二つ。
- processing_rate
- shard_count
というパラメータがあります。
これはプロジェクトのsvnリポジトリの中にある、demoの中のmapreduce.yamlに書いてありました。
ソースはこちら
適当に値を入れて記述するとこんな感じ。
mapreduce: - name: delete all entity mapper: handler: mapper.deleter input_reader: mapreduce.input_readers.DatastoreInputReader params: - name: entity_kind default: model.myentity - name: processing_rate default: 1000 - name: shard_count default: 4 - name: set default parameter mapper: handler: mapper.process input_reader: mapreduce.input_readers.DatastoreInputReader params: - name: entity_kind default: model.myentity - name: processing_rate default: 1000 - name: shard_count default: 16
パラメータ値は適当です。
色々やってみましたが、多ければ純粋に速くなるという訳ではやはり無いようですね。
実行
上記の準備が全て済んだら、デプロイして
http://xxxx.appspot.com/mapreduce/
にアクセスします。
先ほど登録した管理画面が出てきます。
ここで、登録した処理を選んで実行すれば走り出しあます。
経過時間等の細かい状態もここから確認できます。
管理コンソール
自分がやった画面は以下のような感じ。
Running Jobs
実行中、実行済みのjobの状態です。
Activityの欄が、shardsの実行数で、shard_countを変えれば増やせるんですが、
値を
- 設定20のとき16
- 設定200のとき128
だったのでどうも2^nをとるらしい。
Lanch Job
実行したい処理を選べます。
その下に
- entity_kind
- processing_rate
- shard_count
を入力できるフォームがあるので、
mapreduce.yamlで記述した値を変えてわざわざデプロイし直さなくても、
この画面からある程度変更できるみたいです。
detail
実行したjobの詳細です。
この画面はshardを適当に200に設定して128個動いた時の図です。
Processed items per shardのグラフはかなり細かい。
ちなみに今回、5300件程度のデータに、default値4つ設定するという処理を
processing_rate と shard_count をちょこっといじって3回やってみました。
特に計画もない、ただのお試し処理ですが、なんとなく参考になれば。
Overview * success * Elapsed time 00:06:54 * Start timeSat Jun 5 18:44:43 2010 * entity_kind xxx * processing_rate 100 * queue_name default #デフォルトは8 Counters * mapper_calls 4039(9.76/sec avg.)
Overview * success * Elapsed time 00:04:32 * Start timeSat Jun 5 20:56:29 2010 * entity_kind xxx * processing_rate 100000 * queue_name default * shard_count 20 #実際動いたのは16 Counters * mapper_calls 4039(14.85/sec avg.)
Overview * success * Elapsed time 00:04:59 * Start timeSat Jun 5 21:06:36 2010 * entity_kind xxx * processing_rate 100000 * queue_name default * shard_count 200 #実際動いたのは128 Counters * mapper_calls 4039(13.51/sec avg.)
processing_rateはまあ、上げると速くはなるようですね。
でもshardは上げればいいというものでは無さそうです。
自分としては、cronでやってた処理が簡単になった!
という感じなので、一分一秒はあまり気にならないんですよね。
なので、この辺の値の最適化はおいおい見て行きます。
専門家からの知見も持ち込まれて行くでしょう。
あと、感ですが、隠しパラメータはまだまだありそう。
今回僕がやったのはたかだか5000件なので、
id:kazunori_279さんの10万ほどではありませんが、
いずれにせよ、簡単に一括処理が出来て、数分で終わる。
これだけでもかなり便利です。
リポジトリ内
リポジトリの中には、色々あります。
現在はtrunkだけですが、その中には
とあるので、javaの方も現時点でのソースは見れますね。
pythonの中には
- demo
- src
- test
があります。
demoはwikiにある単なる処理ではなく、mapreduceのモジュールを、
webappの中で使うものになってます。
先ほどのパラメータはここで見つけました。
あと何よりもtestが丸々入ってるんですね。
使ってるのはpythonはunittest(javaはもちろんjunit)なので、
だれでもtestを動かせます。すばらしい。
その他
wikiには今回やったこと以外にも、色々書かれていますね。
今回は取り急ぎ上記の処理をやりたかったんで、ちゃんと読んではいませんが、
擬似的なreduceの方法も載ってます。
あとTransactionもサポートの予定みたいですね。
複雑な処理をする場合は、とりあえずその辺が揃ってからかなという印象ですが、
とりあえずは、今回みたいな簡単な一括処理で楽できそうです。