Islands in the byte stream

Technical notes by a software engineer

digdag run (local mode) で並列実行数を制御する

とある分散バッチシステムでdigdagを導入してみています。

www.digdag.io

スケジューラ機能などは使っておらず、タスクを良い感じに並列実行するためのmakeよりちょっと便利なツール、くらいの感じで使ってます。

digdagは下記のようにloopなどを _parallel: true を指定するだけで並列実行できるのがいいですね。

timezone: UTC

+repeat:
  loop>: 3
  _parallel: true
  _do:
    +run:
      sh>: "echo ${i}"

+teardown:
  echo>: finish ${session_time}

ところで、このこの並列実行数を制御する方法がわからなかったのでコマンドラインオプションで渡せるようにpull-requestしました。 digdag v0.9.13 からたぶん使えます。

add --max-task-threads to `digdag run` by gfx · Pull Request #572 · treasure-data/digdag

digdag run --max-task-threads N foo.dig などとしてつかえます。まあ、このPRを出したあとに -X agent.max-task-threads=N でも指定できるとわかったのでちょっと冗長ですが、他のサブコマンド(server, scheduler)との一貫性もあるので存在意義はあるでしょう。

max-task-threadsの数によってloop taskの全体の結果はかわりません。たとえば --max-task-threads 1 のときにloopの最初のtaskが失敗すると、loopの他のタスクがすべて実行されて、loop taskそれ自体は失敗します。

検証コード:

timezone: UTC

+repeat:
  loop>: 3
  _parallel: true
  _do:
    +run:
      sh>: "if [ ${i} == 1 ] ; then false; else echo ${i}; fi"

+teardown:
  echo>: finish ${session_time}

結果:

$ digdag run --rerun mydag.dig
2017-05-28 10:35:31 +0900: Digdag v0.9.10
2017-05-28 10:35:32 +0900 [WARN] (main): Reusing the last session time 2017-05-28T01:00:00+00:00.
2017-05-28 10:35:32 +0900 [INFO] (main): Using session /Users/gfx/repo/mydag/.digdag/status/20170528T010000+0000.
2017-05-28 10:35:32 +0900 [INFO] (main): Starting a new session project id=1 workflow name=mydag session_time=2017-05-28T01:00:00+00:00
2017-05-28 10:35:33 +0900 [INFO] (0017@[0:default]+mydag+repeat): loop>: 3
2017-05-28 10:35:34 +0900 [INFO] (0017@[0:default]+mydag+repeat^sub+loop-0+run): sh>: if [ 0 == 1 ] ; then false; else echo 0; fi
2017-05-28 10:35:34 +0900 [INFO] (0020@[0:default]+mydag+repeat^sub+loop-1+run): sh>: if [ 1 == 1 ] ; then false; else echo 1; fi
2017-05-28 10:35:34 +0900 [ERROR] (0020@[0:default]+mydag+repeat^sub+loop-1+run): Task failed with unexpected error: Command failed with code 1
(snip)
0
2017-05-28 10:35:34 +0900 [INFO] (0021@[0:default]+mydag+repeat^sub+loop-2+run): sh>: if [ 2 == 1 ] ; then false; else echo 2; fi
2
2017-05-28 10:35:35 +0900 [INFO] (0021@[0:default]+mydag^failure-alert): type: notify
error:
  * +mydag+repeat^sub+loop-1+run:
    Command failed with code 1 (runtime)

なので、 max-task-threads の数は純粋にリソースの都合で調整すればよい、ということになります。