Tbpgr Blog

Employee Experience Engineer tbpgr(てぃーびー) のブログ

マルチスレッドデザインパターン | Worker Threadパターン

概要

Worker Threadパターン

詳細

作業対象が発生するまで待機し、発生して初めて作業を行うパターン。
別名Background Thread。

注意点

・Thread Per Messageパターンは都度スレッドを起動していたが、
Worker Threadパターンは既存スレッドを使いまわしている。
全体の処理に占めるスレッド起動の重さ次第でどちらを利用するか決める。
・Workerキャパシティの制御が可能。Worker数を自由に決めることが出来るため
システムの要求や制約に合わせてチューニングが可能になる。
・Requestキャパシティの制御が可能。
一回に受付可能なキャパシティを設定することで制御可能。
キャパシティを増やすとリソースを消費するため、トレードオフとなる。
・invocationとexecutionの分離
通常のメソッドは起動=invocationと実行=executionが一体となっている。
Workder Threadパターンの場合は起動はChannelへの依頼にあたり、
実行はWorkerの実処理にあたる。
これにより、起動だけ行い実行を待つ必要がないため応答性が上がる。
・キャンセルと繰り返し実行
起動から実行の間でキャンセルが可能になる。
Request形式になっているため、Requestを使い回せば再実行が容易になる。
・分散処理
起動と実行がわかれているため、分散処理が可能となる。

構成

・Client:依頼者
・Channel:通信路
・Worker:作業者
・Request:依頼

サンプル

仕様

元請けが下請会社に作業を依頼する。
作業はBTSを介してチケット形式で依頼される。
元請けは2社。(連合システム、れいほうシステム)
下請けは2社。(ねっけつシステム、花園システム)
BTSが一度に受付可能なアクティブチケットは5枚まで。
元請けは10枚チケットを発行可能。
下請けはチケットが発行されると作業を開始する。

・Client:元請=Prime Contractor
・Channel:BTS=Bug Tracking System
・Worker:下請会社=Subcontractor
・Request:チケット=Ticket

ソースコード
# encoding: utf-8
require "pp"
require "thread"

def random_sleep
  sleep ((rand 10) + 1) * 0.05
end

class PrimeContractor < Thread
  def initialize(name, bts)
    block = lambda {create_ticket(name, bts)}
    super(&block)
  end

  private
  def create_ticket(name, bts)
    (1..10).each do |i|
      random_sleep
      bts.create_ticket(i, name)
    end
  end
end

class Ticket
  attr_accessor :no, :creater
  def initialize(no, creater)
    @no, @creater = no, creater
  end
end

class BugTrackingSystem
  MAX_TICKET = 5

  def initialize(subcontractors)
    @subcontractors = []
    @m = Mutex.new
    @c = ConditionVariable.new
    @tickets = []
    subcontractors.each {|s|@subcontractors << Subcontractor.new(s, self)}
  end

  def create_ticket(no, creater)
    @m.synchronize {
      while (@tickets.size >= MAX_TICKET) do
        @c.wait @m
      end
      @tickets << Ticket.new(no, creater)
      @c.signal
      puts "#{creater}#{no} create new ticket"
    }
  end

  def start_subcontractors
    @subcontractors.each{|s|s.run}
  end

  def take_ticket(no, taker)
    @m.synchronize {
      while (@tickets.size == 0) do
        @c.wait @m
      end
      @c.signal
      ticket = @tickets.shift
      puts "#{taker}#{no} take ticket #{ticket.inspect}"
      ticket
    }
  end
end

class Subcontractor < Thread
  def initialize(name, bts)
    block = lambda {take_ticket(name, bts)}
    super(&block)
  end

  private
  def take_ticket(name, bts)
    (1..10).each do |i|
      random_sleep
      ticket = bts.take_ticket(i, name)
      implement(ticket)
    end
  end

  def implement(ticket)
    random_sleep
    puts "complete implement ticket #{ticket.inspect}"
  end
end

bts = BugTrackingSystem.new(["ねっけつシステム", "花園システム"])
prime_contractors = [PrimeContractor.new("連合システム", bts), PrimeContractor.new("れいほうシステム", bts)]
prime_contractors.each {|p|p.run}
bts.start_subcontractors
sleep 10
実行結果例
take ticket waiting...
take ticket waiting...
れいほうシステム1 create new ticket
ねっけつシステム1 take ticket #<Ticket:0x27ba870 @no=1, @creater="れいほうシステム">
take ticket waiting...
連合システム1 create new ticket
花園システム1 take ticket #<Ticket:0x27ba618 @no=1, @creater="連合システム">
れいほうシステム2 create new ticket
complete implement ticket #<Ticket:0x27ba870 @no=1, @creater="れいほうシステム">
ねっけつシステム2 take ticket #<Ticket:0x27ba450 @no=2, @creater="れいほうシステム">
れいほうシステム3 create new ticket
連合システム2 create new ticket
complete implement ticket #<Ticket:0x27ba618 @no=1, @creater="連合システム">
complete implement ticket #<Ticket:0x27ba450 @no=2, @creater="れいほうシステム">
れいほうシステム4 create new ticket
連合システム3 create new ticket
れいほうシステム5 create new ticket
create ticket ticket waiting...
ねっけつシステム3 take ticket #<Ticket:0x27ba210 @no=3, @creater="れいほうシステム">
れいほうシステム6 create new ticket
花園システム2 take ticket #<Ticket:0x27ba150 @no=2, @creater="連合システム">
連合システム4 create new ticket
create ticket ticket waiting...
complete implement ticket #<Ticket:0x27ba150 @no=2, @creater="連合システム">
complete implement ticket #<Ticket:0x27ba210 @no=3, @creater="れいほうシステム">
ねっけつシステム4 take ticket #<Ticket:0x27b1df8 @no=4, @creater="れいほうシステム">
れいほうシステム7 create new ticket
花園システム3 take ticket #<Ticket:0x27b1c78 @no=3, @creater="連合システム">
れいほうシステム8 create new ticket
create ticket ticket waiting...
complete implement ticket #<Ticket:0x27b1df8 @no=4, @creater="れいほうシステム">
complete implement ticket #<Ticket:0x27b1c78 @no=3, @creater="連合システム">
花園システム4 take ticket #<Ticket:0x27b1af8 @no=5, @creater="れいほうシステム">
連合システム5 create new ticket
create ticket ticket waiting...
create ticket ticket waiting...
ねっけつシステム5 take ticket #<Ticket:0x27b1738 @no=6, @creater="れいほうシステム">
れいほうシステム9 create new ticket
create ticket ticket waiting...
complete implement ticket #<Ticket:0x27b1af8 @no=5, @creater="れいほうシステム">
花園システム5 take ticket #<Ticket:0x27b1408 @no=4, @creater="連合システム">
連合システム6 create new ticket
create ticket ticket waiting...
complete implement ticket #<Ticket:0x27b1408 @no=4, @creater="連合システム">
complete implement ticket #<Ticket:0x27b1738 @no=6, @creater="れいほうシステム">
create ticket ticket waiting...
花園システム6 take ticket #<Ticket:0x27b0dc0 @no=7, @creater="れいほうシステム">
れいほうシステム10 create new ticket
create ticket ticket waiting...
ねっけつシステム6 take ticket #<Ticket:0x27b0c28 @no=8, @creater="れいほうシステム">
連合システム7 create new ticket
complete implement ticket #<Ticket:0x27b0dc0 @no=7, @creater="れいほうシステム">
complete implement ticket #<Ticket:0x27b0c28 @no=8, @creater="れいほうシステム">
ねっけつシステム7 take ticket #<Ticket:0x27b08f8 @no=5, @creater="連合システム">
花園システム7 take ticket #<Ticket:0x27b06b8 @no=9, @creater="れいほうシステム">
連合システム8 create new ticket
complete implement ticket #<Ticket:0x27b08f8 @no=5, @creater="連合システム">
連合システム9 create new ticket
ねっけつシステム8 take ticket #<Ticket:0x27b03e8 @no=6, @creater="連合システム">
complete implement ticket #<Ticket:0x27b06b8 @no=9, @creater="れいほうシステム">
連合システム10 create new ticket
花園システム8 take ticket #<Ticket:0x27b0070 @no=10, @creater="れいほうシステム">
complete implement ticket #<Ticket:0x27b0070 @no=10, @creater="れいほうシステム">
complete implement ticket #<Ticket:0x27b03e8 @no=6, @creater="連合システム">
ねっけつシステム9 take ticket #<Ticket:0x2915f10 @no=7, @creater="連合システム">
花園システム9 take ticket #<Ticket:0x1ed2c60 @no=8, @creater="連合システム">
complete implement ticket #<Ticket:0x2915f10 @no=7, @creater="連合システム">
complete implement ticket #<Ticket:0x1ed2c60 @no=8, @creater="連合システム">
花園システム10 take ticket #<Ticket:0x3c7968 @no=9, @creater="連合システム">
ねっけつシステム10 take ticket #<Ticket:0x3c76c8 @no=10, @creater="連合システム">
complete implement ticket #<Ticket:0x3c7968 @no=9, @creater="連合システム">
complete implement ticket #<Ticket:0x3c76c8 @no=10, @creater="連合システム">