2013-05-01

Ruby : EventMachine & EM-WebSocket SpeedUp!!

大概就是最近都在實作EM-WebSocket,

這篇是修改的code,請先參照上篇
http://jokercatz.blogspot.tw/2013/04/ruby-websocket.html

首先是OS的進階支援,類似開啓epoll(Linux)和kqueue(Unix/BSD/OSX)

begin
  EventMachine.kqueue = true
rescue
end
begin
  EventMachine.epoll = true
rescue
end

強迫強制開啓epoll & kqueue的功能,預設應該也還是會開,這語法開來心安就是

另外一個非常重要的部分是threadpool_size,如果不使用此功能…就算ulimit(openfile)開到最大,也是枉然…基本上個人建議開到ulimit(openfile)的5~10%左右

EventMachine.threadpool_size = 100 #ulimit(openfile = 1024)

如果嘗試看EventMachine & EM-WebSocket的source code你會發覺他們都用thread來實作non-blocking的概念,概念很簡單,大概就是建立Thread Pool然後把任何可能會拖慢速度的action丟到thread pool內,而實際丟的東西是Proc,thread pool實作的地方是EM.schedule,簡單的來說…以下是em-websocket的一段code

em-websocket/lib/em-websocket/connection.rb

  8  # define WebSocket callbacks
  9  def onopen(&blk);     @onopen = blk;    end

 19  def trigger_on_open(handshake)
 20    @onopen.call(handshake) if defined? @onopen
 21  end

eventmachine/blob/master/lib/eventmachine.rb

243  # Runs the given callback on the reactor thread, or immediately if called
244  # from the reactor thread. Accepts the same arguments as {EventMachine::Callback}
245  def self.schedule(*a, &b)
246    cb = Callback(*a, &b)
247    if reactor_running? && reactor_thread?
248      cb.call
249    else
250      next_tick { cb.call }
251    end
252  end

所以大概就是Proc => Thread Pool => .call的方式就是了,所以如果你的Thread(ulimit(openfile))數量很多,但Thread Pool size很小,就會發生排隊的情形…然後就會哭哭了

而如果你想用類似EventMachine::Channel的方式完成…那東西只能加入和退出,然後一次廣播(broadcast)只能相同的內容,所以一般來說適應性很低,不如自己實作另外的廣播器…

而另外一方面,如果你想用另外的東西來包裝EventMachine::WebSocket::Connection,也就是很常見到的code的ws,在onopen,onmessage,onclose,onerror甚至是onping,onpon裡面都有的…你不如直接覆蓋掉就好了,因為這樣速度會更快些,然後把大量的廣播用快取實作掉,就不用每個thread都要重頭跑一次

okay,廢話到這邊,以下是我大略的實作部分

##set ulimit(openfile) > 1024
EventMachine.threadpool_size = 100 #1024 * 10%

class Node
  class << self
    attr_accessor :node_user , :node_manager
  end
  @node_user = {}
  @node_manager = {}
  def self.add(ws)
    @node_user[ws.node_id] = ws
    @node_manager[ws.node_id] = ws
  end
  def self.remove(ws)
    @node_user.delete(ws.node_id)
    @node_manager.delete(ws.node_id)
  end
  def self.broadcast_user(msg)
      ##you can do filter here
    broadcast_send(@node_user , msg)
  end
  def self.broadcast_manager(msg)
      ##you can do filter her
    broadcast_send(@node_manager , msg)
  end
  private
  def self.broadcast_send(source , msg)
    source.each_value do |ws|
      ws.send(msg)
    end
  end
end

module EventMachine
  module WebSocket
    class Connection < EventMachine::Connection #mask
      attr_reader :user , :manager , :port , :remote_ip
      alias :node_id :signature

      def initialize(options)

        ###original code of Connection
        @options = options
        @debug = options[:debug] || false
        @secure = options[:secure] || false
        @secure_proxy = options[:secure_proxy] || false
        @tls_options = options[:tls_options] || {}
        @handler = nil
        debug [:initialize]

        ###super => finish original new
        super(options)

        #####
        ###start init custom code
        #####

        @onopen = Proc.new do |handshake|
          #your code
          @port , @remote_ip = Socket.unpack_sockaddr_in(self.get_peername)
          if handshake.path.match(/\A\/OK\z/)
            Node.add(self)
          else
            self.send('error')
            self.close
          end
        end
        @onclose = Proc.new do
          #your code
          Node.remove(self)
        end
        @onmessage = Proc.new do |msg|
          #your code
          temp = msg.split(',')
          if temp[0] == 'echo'
            if temp[1].to_i == 2
              Node.broadcast_user(temp[2])
            elsif temp[1].to_i == 1
              Node.broadcast_manager(temp[2])
            else
              self.send('error')
            end
          end
        end
      end
    end
  end
end

大概就是這樣的方式,雖然寫的不是很好|||,不過精簡的code大概有表明EM.channel(browcast)的寫法,把它提出來有很多好處,類似過濾和快取之類的,當然你也可以再把這些動作包裝起來,丟到EM.schedule(thread pool)內,而非常建議broadcast的部分在send之前一定要進行cache的動作,因為你還是會讓一個thread的動作卡在那邊執行,所以整套都要非常快速才行,否則請切開你的action,使用多個回傳或是先送完大部份,少部分再等待合併回傳的方式,類似資料量與效能的部分

  1. 回傳簡單的大部份內容(回傳約90%,處理量小) fast => send first
  2. 回傳需要處理的部分內容(回傳約9%,需要處理) slow ( ||=> EM.schedule)
  3. 回傳1&2不足的資料量(回傳的1%,要處理很久) very slow ( ===> EM.schedule)

okay…這樣每個回傳就不需要重新包裝和處理,又滿足全部的需求與時間的部分就是,畢竟這已經是websocket概念而非以前http時,必須每個request都需要全部跑完的情況就是了

當然我的實作的部分非常的龐大…這邊沒辦法介紹完畢…不過這是從broadcast(1000 client / 10s) => (8000 client / 300ms)的速度的心得就是,當然實作部分也需要很多技巧,類似不要用過度的flag(上面demo內用原生的EM::connection @signture而非自建hash用object_id),和過度的包裝(能覆寫ws就用ws,能用原生就用原生),然後釐清動線流程,把每個動作的結果快取與對周邊的影響減到最小,大概就能加速到一個境界就是了

沒有留言:

張貼留言