大概就是最近都在實作EM-WebSocket,
這篇是修改的code,請先參照上篇
http://jokercatz.blogspot.tw/2013/04/ruby-websocket.html
首先是OS的進階支援,類似開啓epoll(Linux)和kqueue(Unix/BSD/OSX)
begin
EventMachine.kqueue = true
rescue
end
begin
EventMachine.epoll = true
rescue
end
強迫強制開啓epoll & kqueue的功能,預設應該也還是會開,這語法開來心安就是
另外一個非常重要的部分是threadpool_size,如果不使用此功能…就算ulimit(openfile)開到最大,也是枉然…基本上個人建議開到ulimit(openfile)的5~10%左右
EventMachine.threadpool_size = 100 #ulimit(openfile = 1024)
如果嘗試看EventMachine & EM-WebSocket的source code你會發覺他們都用thread來實作non-blocking的概念,概念很簡單,大概就是建立Thread Pool然後把任何可能會拖慢速度的action丟到thread pool內,而實際丟的東西是Proc,thread pool實作的地方是EM.schedule,簡單的來說…以下是em-websocket的一段code
em-websocket/lib/em-websocket/connection.rb
8 # define WebSocket callbacks
9 def onopen(&blk); @onopen = blk; end
19 def trigger_on_open(handshake)
20 @onopen.call(handshake) if defined? @onopen
21 end
eventmachine/blob/master/lib/eventmachine.rb
243 # Runs the given callback on the reactor thread, or immediately if called
244 # from the reactor thread. Accepts the same arguments as {EventMachine::Callback}
245 def self.schedule(*a, &b)
246 cb = Callback(*a, &b)
247 if reactor_running? && reactor_thread?
248 cb.call
249 else
250 next_tick { cb.call }
251 end
252 end
所以大概就是Proc => Thread Pool => .call的方式就是了,所以如果你的Thread(ulimit(openfile))數量很多,但Thread Pool size很小,就會發生排隊的情形…然後就會哭哭了
而如果你想用類似EventMachine::Channel的方式完成…那東西只能加入和退出,然後一次廣播(broadcast)只能相同的內容,所以一般來說適應性很低,不如自己實作另外的廣播器…
而另外一方面,如果你想用另外的東西來包裝EventMachine::WebSocket::Connection,也就是很常見到的code的ws,在onopen,onmessage,onclose,onerror甚至是onping,onpon裡面都有的…你不如直接覆蓋掉就好了,因為這樣速度會更快些,然後把大量的廣播用快取實作掉,就不用每個thread都要重頭跑一次
okay,廢話到這邊,以下是我大略的實作部分
##set ulimit(openfile) > 1024
EventMachine.threadpool_size = 100 #1024 * 10%
class Node
class << self
attr_accessor :node_user , :node_manager
end
@node_user = {}
@node_manager = {}
def self.add(ws)
@node_user[ws.node_id] = ws
@node_manager[ws.node_id] = ws
end
def self.remove(ws)
@node_user.delete(ws.node_id)
@node_manager.delete(ws.node_id)
end
def self.broadcast_user(msg)
##you can do filter here
broadcast_send(@node_user , msg)
end
def self.broadcast_manager(msg)
##you can do filter her
broadcast_send(@node_manager , msg)
end
private
def self.broadcast_send(source , msg)
source.each_value do |ws|
ws.send(msg)
end
end
end
module EventMachine
module WebSocket
class Connection < EventMachine::Connection #mask
attr_reader :user , :manager , :port , :remote_ip
alias :node_id :signature
def initialize(options)
###original code of Connection
@options = options
@debug = options[:debug] || false
@secure = options[:secure] || false
@secure_proxy = options[:secure_proxy] || false
@tls_options = options[:tls_options] || {}
@handler = nil
debug [:initialize]
###super => finish original new
super(options)
#####
###start init custom code
#####
@onopen = Proc.new do |handshake|
#your code
@port , @remote_ip = Socket.unpack_sockaddr_in(self.get_peername)
if handshake.path.match(/\A\/OK\z/)
Node.add(self)
else
self.send('error')
self.close
end
end
@onclose = Proc.new do
#your code
Node.remove(self)
end
@onmessage = Proc.new do |msg|
#your code
temp = msg.split(',')
if temp[0] == 'echo'
if temp[1].to_i == 2
Node.broadcast_user(temp[2])
elsif temp[1].to_i == 1
Node.broadcast_manager(temp[2])
else
self.send('error')
end
end
end
end
end
end
end
大概就是這樣的方式,雖然寫的不是很好|||,不過精簡的code大概有表明EM.channel(browcast)的寫法,把它提出來有很多好處,類似過濾和快取之類的,當然你也可以再把這些動作包裝起來,丟到EM.schedule(thread pool)內,而非常建議broadcast的部分在send之前一定要進行cache的動作,因為你還是會讓一個thread的動作卡在那邊執行,所以整套都要非常快速才行,否則請切開你的action,使用多個回傳或是先送完大部份,少部分再等待合併回傳的方式,類似資料量與效能的部分
- 回傳簡單的大部份內容(回傳約90%,處理量小) fast => send first
- 回傳需要處理的部分內容(回傳約9%,需要處理) slow ( ||=> EM.schedule)
- 回傳1&2不足的資料量(回傳的1%,要處理很久) very slow ( ===> EM.schedule)
okay…這樣每個回傳就不需要重新包裝和處理,又滿足全部的需求與時間的部分就是,畢竟這已經是websocket概念而非以前http時,必須每個request都需要全部跑完的情況就是了
當然我的實作的部分非常的龐大…這邊沒辦法介紹完畢…不過這是從broadcast(1000 client / 10s) => (8000 client / 300ms)的速度的心得就是,當然實作部分也需要很多技巧,類似不要用過度的flag(上面demo內用原生的EM::connection @signture而非自建hash用object_id),和過度的包裝(能覆寫ws就用ws,能用原生就用原生),然後釐清動線流程,把每個動作的結果快取與對周邊的影響減到最小,大概就能加速到一個境界就是了
沒有留言:
張貼留言