Tbpgr Blog

Employee Experience Engineer tbpgr(てぃーびー) のブログ

マルチスレッドデザインパターン | Guarded Suspention

概要

Guarded Suspention

詳細

スレッドを待たせてインスタンスの安全性を確保するパターン。
別名guarded wait,spin lock。

サンプル仕様

クライアントとサーバーがキューを利用してリクエストをやりとりする。
クライアントは0.0〜0.9秒に1回キューへリクエストをプットします。
サーバーは0.0〜0.9秒感覚でキューのリクエストをゲットします。
リクエスト要求時にキューが空の場合はキューに値が設定されるまで待機します。
クライアントがキューへのプッシュを行えば、サーバーへ通知が行われ
サーバー側の処理が再開します。
この「キューが空かどうか」の判断をガード条件といいます。

※1.この場合Guarded Suspentionではキューを「GuardObject」といいます。
※2.Rubyには今回作成したキューに当たる機能はありますが、学習のためわざと1から実装しています。

UML

サンプルソース

同期の管理にMutex
状態の管理にConditionVariable
を利用します。

# encoding: utf-8
require "pp"
require "thread"

module Kernel
  # wait 0〜0.9 sec
  def get_random_sleep_time
    (rand 10) * 0.1
  end
end

# encoding: utf-8
class Time
  def self.now_msec_format
    self.now.strftime("%Y/%m/%d %H:%M:%S.%L")
  end
end

class Request
  attr_accessor :name
  def initialize(name)
    @name = name
  end
end

class Client < Thread
  def initialize(request_queue)
    block = lambda {
      10.times do |i|
        req = Request.new "No#{i} #{Time.now_msec_format}"
        request_queue.put_request req
        sleep get_random_sleep_time
      end
    }
    super(&block)
  end
end

class RequestQueue
  def initialize
    @m = Mutex.new
    @c = ConditionVariable.new
    @queue = []
  end

  def get_request
    @m.synchronize {
      while @queue.first.nil? do
        puts "waiting..."
        @c.wait @m
      end
      @queue.shift
    }
  end

  def put_request(req)
    @m.synchronize {
      @queue.push req
      @c.signal
    }
  end
end

class Server < Thread
  def initialize(request_queue)
    block = lambda {
      10.times do |i|
        puts "start get_request #{Time.now_msec_format}"
        req = request_queue.get_request
        puts "end   get_request #{Time.now_msec_format}:req = #{req.inspect}"
        sleep get_random_sleep_time
      end
    }
    super(&block)
  end
end

request_queue = RequestQueue.new
threads = []
threads << Client.new(request_queue)
threads << Server.new(request_queue)
threads.each{|t| t.join}

実行結果

start get_request 2013/09/15 22:41:35.908
end   get_request 2013/09/15 22:41:35.908:req = #<Request:0x254d5b8 @name="No0 2013/09/15 22:41:35.908">
start get_request 2013/09/15 22:41:36.008
waiting...
end   get_request 2013/09/15 22:41:36.808:req = #<Request:0x254d228 @name="No1 2013/09/15 22:41:36.808">
start get_request 2013/09/15 22:41:37.108
end   get_request 2013/09/15 22:41:37.108:req = #<Request:0x254d078 @name="No2 2013/09/15 22:41:37.008">
start get_request 2013/09/15 22:41:37.608
end   get_request 2013/09/15 22:41:37.608:req = #<Request:0x254cdf0 @name="No3 2013/09/15 22:41:37.108">
start get_request 2013/09/15 22:41:38.308
end   get_request 2013/09/15 22:41:38.308:req = #<Request:0x254cb38 @name="No4 2013/09/15 22:41:37.808">
start get_request 2013/09/15 22:41:38.808
end   get_request 2013/09/15 22:41:38.808:req = #<Request:0x254ca60 @name="No5 2013/09/15 22:41:37.808">
start get_request 2013/09/15 22:41:39.208
end   get_request 2013/09/15 22:41:39.208:req = #<Request:0x254c7c0 @name="No6 2013/09/15 22:41:38.708">
start get_request 2013/09/15 22:41:39.508
end   get_request 2013/09/15 22:41:39.508:req = #<Request:0x254c538 @name="No7 2013/09/15 22:41:39.008">
start get_request 2013/09/15 22:41:39.508
waiting...
end   get_request 2013/09/15 22:41:39.608:req = #<Request:0x254c088 @name="No8 2013/09/15 22:41:39.608">
start get_request 2013/09/15 22:41:40.408
end   get_request 2013/09/15 22:41:40.408:req = #<Request:0x254bda0 @name="No9 2013/09/15 22:41:40.308">

RubyのQueue利用版

自作のRequestQueueをRubyのQueueに置き換えました。

# encoding: utf-8
require "pp"
require "thread"

module Kernel
  # wait 0〜0.9 sec
  def get_random_sleep_time
    (rand 10) * 0.1
  end
end

# encoding: utf-8
class Time
  def self.now_msec_format
    self.now.strftime("%Y/%m/%d %H:%M:%S.%L")
  end
end

class Request
  attr_accessor :name
  def initialize(name)
    @name = name
  end
end

class Client < Thread
  def initialize(request_queue)
    block = lambda {
      10.times do |i|
        req = Request.new "No#{i} #{Time.now_msec_format}"
        request_queue.push req
        sleep get_random_sleep_time
      end
    }
    super(&block)
  end
end

class Server < Thread
  def initialize(request_queue)
    block = lambda {
      10.times do |i|
        puts "start get_request #{Time.now_msec_format}"
        req = request_queue.pop
        puts "end   get_request #{Time.now_msec_format}:req = #{req.inspect}"
        sleep get_random_sleep_time
      end
    }
    super(&block)
  end
end

request_queue = Queue.new
threads = []
threads << Client.new(request_queue)
threads << Server.new(request_queue)
threads.each{|t| t.join}