Add synchronization around shared resources in NonblockingServer
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@669006 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/rb/lib/thrift/server/nonblockingserver.rb b/lib/rb/lib/thrift/server/nonblockingserver.rb
index 54dad04..920175b 100644
--- a/lib/rb/lib/thrift/server/nonblockingserver.rb
+++ b/lib/rb/lib/thrift/server/nonblockingserver.rb
@@ -1,5 +1,5 @@
require 'thrift/server'
-
+require 'sync'
# thrift/server already imports fastthread/thread
module Thrift
@@ -12,6 +12,11 @@
#
# we inherit from ThreadPoolServer for the initialize/rescuable_serve methods
class NonblockingServer < ThreadPoolServer
+ def initialize(processor, serverTransport, transportFactory=nil, protocolFactory=nil, num=20)
+ super
+ @sync = Sync.new
+ end
+
def serve
@server_thread = Thread.current
@serverTransport.listen
@@ -27,7 +32,11 @@
thread_group = ThreadGroup.new
loop do
break if @shutdown
- rd, = select([@serverTransport.handle, *connections.keys])
+ handles = [@serverTransport.handle]
+ @sync.synchronize(Sync::SH) do
+ handles.concat connections.keys
+ end
+ rd, = select(handles)
next if rd.nil?
rd.each do |socket|
if socket == @serverTransport.handle
@@ -35,16 +44,25 @@
buffer = ''
outtrans = @transportFactory.get_transport(client)
outprot = @protocolFactory.get_protocol(outtrans)
- connections[client.handle] = [client, buffer, outtrans, outprot]
+ @sync.synchronize(Sync::EX) do
+ connections[client.handle] = [client, buffer, outtrans, outprot]
+ end
else
- client, buffer, outtrans, outprot = connections[socket]
+ client, buffer, outtrans, outprot = nil # for scope
+ @sync.synchronize(Sync::SH) do
+ client, buffer, outtrans, outprot = connections[socket]
+ end
if socket.eof?
client.close
- connections.delete(socket)
+ @sync.synchronize(Sync::EX) do
+ connections.delete(socket)
+ end
else
buffer << client.read(4096, true)
if has_full_frame?(buffer)
- running_connections[socket] = connections.delete(socket)
+ @sync.synchronize(Sync::EX) do
+ running_connections[socket] = connections.delete(socket)
+ end
@thread_q.push :token
t = Thread.new(Thread.current) do |master|
begin
@@ -54,17 +72,27 @@
@processor.process(inprot, outprot)
if @shutdown
client.close
- running_connections.delete(socket)
+ @sync.synchronize(Sync::EX) do
+ running_connections.delete(socket)
+ end
else
- swapping_connections[socket] = running_connections.delete(socket)
- master.wakeup
+ @sync.synchronize(Sync::EX) do
+ swapping_connections[socket] = running_connections.delete(socket)
+ end
end
rescue => e
outtrans.close
@exception_q.push e
ensure
- running_connections.delete(socket)
- connections[socket] = swapping_connections.delete(socket) if swapping_connections.include? socket
+ should_wakeup = false
+ @sync.synchronize(Sync::EX) do
+ running_connections.delete(socket)
+ if swapping_connections.include? socket
+ connections[socket] = swapping_connections.delete(socket)
+ should_wakeup = true
+ end
+ end
+ master.wakeup if should_wakeup
intrans.close
@thread_q.pop
end
@@ -77,9 +105,13 @@
end
if @shutdown
@serverTransport.close
- connections.merge! running_connections
- connections.merge! swapping_connections
- connections.values.each do |client, buffer, outtrans, outprot|
+ handles = []
+ @sync.synchronize(Sync::SH) do
+ handles = connections
+ handles.merge! running_connections
+ handles.merge! swapping_connections
+ end
+ handles.values.each do |client, buffer, outtrans, outprot|
# can't close completely or we'll break active messages
# but lets at least stop accepting input
client.handle.close_read
@@ -97,7 +129,7 @@
end
thread_group.list.each { |t| t.kill } if @shutdown_kill
# now kill connections completely if they still exists
- connections.values.each do |client, buffer, outtrans, outprot|
+ handles.values.each do |client, buffer, outtrans, outprot|
client.close
end
end