EventMachineでジョブキューを高速に処理するたたき台

追記: EventMachineでジョブキューを高速に処理するたたき台2 - 下林明正のブログ

具体的にはクローラーみたいなのを書きたいシチュエーションを想定しているんだけど、

  • ジョブIDの配列みたいなのは予め分かっている
  • 最大試行回数を考慮したリトライが可能
  • Concurrencyを考慮して処理をする

という要件を満たしたコードをEventMachine書こうと思った時にどうやって書けばいいのか調べても良く分からない。EventMachine流行ってないっぽいし、なんかもっと別の良いやり方があるのかも知れない。 良く分からないなりに試行錯誤した結果、以下の様なコードはどうか、と思った。

require 'rubygems'
require 'eventmachine'

MAX_RETRY_COUNT = 3

class Mailer < EM::DefaultDeferrable
  def initialize(mail_address)
    @mail_address = mail_address

    callback {
      puts "#{@mail_address}: sent"
    }
    errback {
      puts "#{@mail_address}: failed"
    }
  end

  def send
    puts "#{@mail_address}: sending..."
    rand >= 0.8 ? succeed : fail
  end
end

def loop(jobs, retry_count = 0)
  puts "retry(#{retry_count}/#{MAX_RETRY_COUNT}):" if retry_count > 0

  EM::Iterator.new(jobs.clone, 2).each( proc{|i, iter|
    mailer = Mailer.new("hoge_#{i}@example.com")
    mailer.callback {
      jobs.delete(i)
      iter.next
    }
    mailer.errback {
      iter.next
    }
    mailer.send
  }, proc{|responses|
    if jobs.length > 0 and retry_count < MAX_RETRY_COUNT
      EM.next_tick {
        loop(jobs, retry_count + 1)
      }
    else
      puts 'done'
      EM.stop
    end
  })
end

EM.run {
  jobs = (1..5).to_a
  loop(jobs)
}

この辺りを参考にして考えた。

しかしloopの再帰が深くなるに連れてメモリなどを圧迫しそうなので、再帰をやめて別の書き方にしたほうが良い気がする。 loopは残タスクがあるかどうかをどっかにセットするようにして、timerでそれを監視して必要ならloopをまた呼ぶとか? もしくはChannelでイベントをやりとりしてloopを呼ばせるのも良いかもしれない? なんにせよ今日はそこまでやってる時間はもう無くなった。

おまけに、Deferrableの中で無理やりリトライしようとして滅茶苦茶になったヤツ。 callbackをいい感じに引き回せない構造になってしまっているので、破綻するのが目に見えている。

require 'rubygems'
require 'eventmachine'

class Mailer < EM::DefaultDeferrable
  def initialize(iter, mail_address)
    @iter = iter
    @mail_address = mail_address
  end

  def send(retry_count = 0)
    if retry_count >= 3
      puts "#{@mail_address}/#{retry_count}: failed"
      succeed
      return
    end

    # 以前のstatusを初期化しておかないとコールバックを登録した瞬間に実行されるっぽい?のでsend毎に初期化している
    set_deferred_status nil

    # http://eventmachine.rubyforge.org/EventMachine/Deferrable.html#set_deferred_status-instance_method
    # > We're shifting callbacks off and discarding them as we execute them.
    # > after processing callbacks or errbacks, CLEAR the other set of handlers.
    callback {
      puts "#{@mail_address}/#{retry_count}: sent"
      @iter.next
    }
    errback {
      puts "#{@mail_address}/#{retry_count}: failed, will retry"
      EM.next_tick {
        puts "#{@mail_address}/#{retry_count}: retrying..."
        send(retry_count + 1)
      }
    }

    puts "#{@mail_address}/#{retry_count}: sending..."
    rand >= 0.5 ? succeed : fail
  end
end

EM.run {
  EM::Iterator.new((1..5).to_a, 2).each( proc{|i, iter|
    mailer = Mailer.new(iter, "#{i}@example.com")
    mailer.send
  }, proc{|responses|
    puts 'done'
    EM.stop
  })
}