阿里云「飞天发布时刻」2024来啦!新产品、新特性、新能力、新方案,等你来探~ 了解详情
写点什么

浅谈 Thin 的事件驱动模型

  • 2019-12-05
  • 本文字数:12226 字

    阅读完需:约 40 分钟

浅谈 Thin 的事件驱动模型


在上一篇文章中我们已经介绍了 WEBrick 的实现,它的 handler 是写在 Rack 工程中的,而在这篇文章介绍的 webserver thin 的 Rack 处理器也是写在 Rack 中的;与 WEBrick 相同,Thin 的实现也非常简单,官方对它的介绍是:


A very fast & simple Ruby web server.


它将 MongrelEventMachineRack 三者进行组合,在其中起到胶水的作用,所以在理解 Thin 的实现的过程中我们也需要分析 EventMachine 到底是如何工作的。

Thin 的实现

在这一节中我们将从源代码来分析介绍 Thin 的实现原理,因为部分代码仍然是在 Rack 工程中实现的,所以我们要从 Rack 工程的代码开始理解 Thin 的实现。

从 Rack 开始

Thin 的处理器 Rack::Handler::Thin 与其他遵循 Rack 协议的 webserver 一样都实现了 .run 方法,接受 Rack 应用和 options 作为输入:


Ruby


module Rack  module Handler    class Thin      def self.run(app, options={})        environment  = ENV['RACK_ENV'] || 'development'        default_host = environment == 'development' ? 'localhost' : '0.0.0.0'
host = options.delete(:Host) || default_host port = options.delete(:Port) || 8080 args = [host, port, app, options] args.pop if ::Thin::VERSION::MAJOR < 1 && ::Thin::VERSION::MINOR < 8 server = ::Thin::Server.new(*args) yield server if block_given? server.start end end endend
复制代码


上述方法仍然会从 options 中取出 ip 地址和端口号,然后初始化一个 Thin::Server 的实例后,执行 #start 方法在 8080 端口上监听来自用户的请求。

初始化服务

Thin 服务的初始化由以下的代码来处理,首先会处理在 Rack::Handler::Thin.run 中传入的几个参数 hostportappoptions,将 Rack 应用存储在临时变量中:


Ruby


From: lib/thin/server.rb @ line 100:Owner: Thin::Server
def initialize(*args, &block) host, port, options = DEFAULT_HOST, DEFAULT_PORT, {}
args.each do |arg| case arg when 0.class, /^\d+$/ then port = arg.to_i when String then host = arg when Hash then options = arg else @app = arg if arg.respond_to?(:call) end end
@backend = select_backend(host, port, options) @backend.server = self @backend.maximum_connections = DEFAULT_MAXIMUM_CONNECTIONS @backend.maximum_persistent_connections = DEFAULT_MAXIMUM_PERSISTENT_CONNECTIONS @backend.timeout = options[:timeout] || DEFAULT_TIMEOUT
@app = Rack::Builder.new(&block).to_app if blockend
复制代码


在初始化服务的过程中,总共只做了三件事情,处理参数、选择并配置 backend,创建新的应用:



处理参数的过程自然不用多说,只是这里判断的方式并不是按照顺序处理的,而是按照参数的类型;在初始化器的最后,如果向初始化器传入了 block,那么就会使用 Rack::Builder 和 block 中的代码初始化一个新的 Rack 应用。

选择后端

在选择后端时 Thin 使用了 #select_backend 方法,这里使用 case 语句替代多个 ifelse,也是一个我们可以使用的小技巧:


Ruby


From: lib/thin/server.rb @ line 261:Owner: Thin::Server
def select_backend(host, port, options) case when options.has_key?(:backend) raise ArgumentError, ":backend must be a class" unless options[:backend].is_a?(Class) options[:backend].new(host, port, options) when options.has_key?(:swiftiply) Backends::SwiftiplyClient.new(host, port, options) when host.include?('/') Backends::UnixServer.new(host) else Backends::TcpServer.new(host, port) endend
复制代码


在大多数时候,我们只会选择 UnixServerTcpServer 两种后端中的一个,而后者又是两者中使用更为频繁的后端:


Ruby


From: lib/thin/backends/tcp_server.rb @ line 8:Owner: Thin::Backends::TcpServer
def initialize(host, port) @host = host @port = port super()end
From: lib/thin/backends/base.rb @ line 47:Owner: Thin::Backends::Base
def initialize @connections = {} @timeout = Server::DEFAULT_TIMEOUT @persistent_connection_count = 0 @maximum_connections = Server::DEFAULT_MAXIMUM_CONNECTIONS @maximum_persistent_connections = Server::DEFAULT_MAXIMUM_PERSISTENT_CONNECTIONS @no_epoll = false @ssl = nil @threaded = nil @started_reactor = falseend
复制代码


初始化的过程中只是对属性设置默认值,比如 hostport 以及超时时间等等,并没有太多值得注意的代码。

启动服务

在启动服务时会直接调用 TcpServer#start 方法并在其中传入一个用于处理信号的 block:


Ruby


From: lib/thin/server.rb @ line 152:Owner: Thin::Server
def start raise ArgumentError, 'app required' unless @app
log_info "Thin web server (v#{VERSION::STRING} codename #{VERSION::CODENAME})" log_debug "Debugging ON" trace "Tracing ON"
log_info "Maximum connections set to #{@backend.maximum_connections}" log_info "Listening on #{@backend}, CTRL+C to stop"
@backend.start { setup_signals if @setup_signals }end
复制代码


虽然这里的 backend 其实已经被选择成了 TcpServer,但是该子类并没有覆写 #start 方法,这里执行的方法其实是从父类继承的:


Ruby


From: lib/thin/backends/base.rb @ line 60:Owner: Thin::Backends::Base
def start @stopping = false starter = proc do connect yield if block_given? @running = true end
# Allow for early run up of eventmachine. if EventMachine.reactor_running? starter.call else @started_reactor = true EventMachine.run(&starter) endend
复制代码


上述方法在构建一个 starter block 之后,将该 block 传入 EventMachine.run 方法,随后执行的 #connect 会启动一个 EventMachine 的服务器用于处理用户的网络请求:


Ruby


From: lib/thin/backends/tcp_server.rb @ line 15:Owner: Thin::Backends::TcpServer
def connect @signature = EventMachine.start_server(@host, @port, Connection, &method(:initialize_connection)) binary_name = EventMachine.get_sockname( @signature ) port_name = Socket.unpack_sockaddr_in( binary_name ) @port = port_name[0] @host = port_name[1] @signatureend
复制代码


在 EventMachine 的文档中,.start_server 方法被描述成一个在指定的地址和端口上初始化 TCP 服务的方法,正如这里所展示的,它经常在 .run 方法的 block 中执行;该方法的参数 Connection 作为处理 TCP 请求的类,会实现不同的方法接受各种各样的回调,传入的 initialize_connection block 会在有请求需要处理时对 Connection 对象进行初始化:


Connection 对象继承自 EventMachine::Connection,是 EventMachine 与外界的接口,在 EventMachine 中的大部分事件都会调用 Connection 的一个实例方法来传递数据和参数。


Ruby


From: lib/thin/backends/base.rb @ line 145:Owner: Thin::Backends::Base
def initialize_connection(connection) connection.backend = self connection.app = @server.app connection.comm_inactivity_timeout = @timeout connection.threaded = @threaded connection.start_tls(@ssl_options) if @ssl
if @persistent_connection_count < @maximum_persistent_connections connection.can_persist! @persistent_connection_count += 1 end @connections[connection.__id__] = connectionend
复制代码

处理请求的连接

Connection 类中有很多的方法 #post_init#receive_data 方法等等都是由 EventMachine 在接收到请求时调用的,当 Thin 的服务接收到来自客户端的数据时就会调用 #receive_data 方法:


Ruby


From: lib/thin/connection.rb @ line 36:Owner: Thin::Connection
def receive_data(data) @idle = false trace data process if @request.parse(data)rescue InvalidRequest => e log_error("Invalid request", e) post_process Response::BAD_REQUESTend
复制代码


在这里我们看到了与 WEBrick 在处理来自客户端的原始数据时使用的方法 #parse,它会解析客户端请求的原始数据并执行 #process 来处理 HTTP 请求:


Ruby


From: lib/thin/connection.rb @ line 47:Owner: Thin::Connection
def process if threaded? @request.threaded = true EventMachine.defer { post_process(pre_process) } else @request.threaded = false post_process(pre_process) endend
复制代码


如果当前的连接允许并行处理多个用户的请求,那么就会在 EventMachine.defer 的 block 中执行两个方法 #pre_process#post_process


Ruby


From: lib/thin/connection.rb @ line 63:Owner: Thin::Connection
def pre_process @request.remote_address = remote_address @request.async_callback = method(:post_process)
response = AsyncResponse catch(:async) do response = @app.call(@request.env) end responserescue Exception => e unexpected_error(e) can_persist? && @request.persistent? ? Response::PERSISTENT_ERROR : Response::ERRORend
复制代码


#pre_process 中没有做太多的事情,只是调用了 Rack 应用的 #call 方法,得到一个三元组 response,在这之后将这个数组传入 #post_process 方法:


Ruby


From: lib/thin/connection.rb @ line 95:Owner: Thin::Connection
def post_process(result) return unless result result = result.to_a return if result.first == AsyncResponse.first
@response.status, @response.headers, @response.body = *result @response.each do |chunk| send_data chunk endrescue Exception => e unexpected_error(e) close_connectionensure if @response.body.respond_to?(:callback) && @response.body.respond_to?(:errback) @response.body.callback { terminate_request } @response.body.errback { terminate_request } else terminate_request unless result && result.first == AsyncResponse.first endend
复制代码


#post_response 方法将传入的数组赋值给 responsestatusheadersbody 这三部分,在这之后通过 #send_data 方法将 HTTP 响应以块的形式写回 Socket;写回结束后可能会调用对应的 callback 并关闭持有的 requestresponse 两个实例变量。


上述方法中调用的 #send_data 继承自 EventMachine::Connection 类。

小结

到此为止,我们对于 Thin 是如何处理来自用户的 HTTP 请求的就比较清楚了,我们可以看到 Thin 本身并没有做一些类似解析 HTTP 数据包以及发送数据的问题,它使用了来自 Rack 和 EventMachine 两个开源框架中很多已有的代码逻辑,确实只做了一些胶水的事情。


对于 Rack 是如何工作的我们在前面的文章 谈谈 Rack 协议与实现 中已经介绍过了;虽然我们看到了很多与 EventMachine 相关的代码,但是到这里我们仍然对 EventMachine 不是太了解。

EventMachine 和 Reactor 模式

为了更好地理解 Thin 的工作原理,在这里我们会介绍一个 EventMachine 和 Reactor 模式。


EventMachine 其实是一个使用 Ruby 实现的事件驱动的并行框架,它使用 Reactor 模式提供了事件驱动的 IO 模型,如果你对 Node.js 有所了解的话,那么你一定对事件驱动这个词并不陌生,EventMachine 的出现主要是为了解决两个核心问题:


  • 为生产环境提供更高的可伸缩性、更好的性能和稳定性;

  • 为上层提供了一些能够减少高性能的网络编程复杂性的 API;


其实 EventMachine 的主要作用就是将所有同步的 IO 都变成异步的,调度都通过事件来进行,这样用于监听用户请求的进程不会被其他代码阻塞,能够同时为更多的客户端提供服务;在这一节中,我们需要了解一下在 Thin 中使用的 EventMachine 中几个常用方法的实现。

启动事件循环

EventMachine 其实就是一个事件循环(Event Loop),当我们想使用 EventMachine 来处理某些任务时就一定需要调用 .run 方法启动这个事件循环来接受外界触发的各种事件:


Ruby


From: lib/eventmachine.rb @ line 149:Owner: #<Class:EventMachine>
def self.run blk=nil, tail=nil, &block # ... begin @reactor_pid = Process.pid @reactor_running = true initialize_event_machine (b = blk || block) and add_timer(0, b) if @next_tick_queue && !@next_tick_queue.empty? add_timer(0) { signal_loopbreak } end @reactor_thread = Thread.current
run_machine ensure until @tails.empty? @tails.pop.call end
release_machine cleanup_machine @reactor_running = false @reactor_thread = nil endend
复制代码


在这里我们会使用 .initialize_event_machine 初始化当前的事件循环,其实也就是一个全局的 Reactor 的单例,最终会执行 Reactor#initialize_for_run 方法:


Ruby


From: lib/em/pure_ruby.rb @ line 522:Owner: EventMachine::Reactor
def initialize_for_run @running = false @stop_scheduled = false @selectables ||= {}; @selectables.clear @timers = SortedSet.new # [] set_timer_quantum(0.1) @current_loop_time = Time.now @next_heartbeat = @current_loop_time + HeartbeatIntervalend
复制代码


在启动事件循环的过程中,它还会将传入的 block 与一个 interval 为 0 的键组成键值对存到 @timers 字典中,所有加入的键值对都会在大约 interval 的时间过后执行一次 block。


随后执行的 #run_machine 在最后也会执行 Reactor#run 方法,该方法中包含一个 loop 语句,也就是我们一直说的事件循环:


Ruby


From: lib/em/pure_ruby.rb @ line 540:Owner: EventMachine::Reactor
def run raise Error.new( "already running" ) if @running @running = true
begin open_loopbreaker
loop { @current_loop_time = Time.now
break if @stop_scheduled run_timers break if @stop_scheduled crank_selectables break if @stop_scheduled run_heartbeats } ensure close_loopbreaker @selectables.each {|k, io| io.close} @selectables.clear
@running = false endend
复制代码


在启动事件循环之间会在 #open_loopbreaker 中创建一个 LoopbreakReader 的实例绑定在 127.0.0.1 和随机的端口号组成的地址上,然后开始运行事件循环。



在事件循环中,Reactor 总共需要执行三部分的任务,分别是执行定时器、处理 Socket 上的事件以及运行心跳方法。


无论是运行定时器还是执行心跳方法其实都非常简单,只要与当前时间进行比较,如果到了触发的时间就调用正确的方法或者回调,最后的 #crank_selectables 方法就是用于处理 Socket 上读写事件的方法了:


Ruby


From: lib/em/pure_ruby.rb @ line 540:Owner: EventMachine::Reactor
def crank_selectables readers = @selectables.values.select { |io| io.select_for_reading? } writers = @selectables.values.select { |io| io.select_for_writing? }
s = select(readers, writers, nil, @timer_quantum)
s and s[1] and s[1].each { |w| w.eventable_write } s and s[0] and s[0].each { |r| r.eventable_read }
@selectables.delete_if {|k,io| if io.close_scheduled? io.close begin EventMachine::event_callback io.uuid, ConnectionUnbound, nil rescue ConnectionNotBound; end true end }end
复制代码


上述方法会在 Socket 变成可读或者可写时执行 #eventable_write#eventable_read 执行事件的回调,我们暂时放下这两个方法,先来了解一下 EventMachine 是如何启动服务的。

启动服务

在启动服务的过程中,最重要的目的就是创建一个 Socket 并绑定在指定的 ip 和端口上,在实现这个目的的过程中,我们使用了以下的几个方法,首先是 EventMachine.start_server


Ruby


From: lib/eventmachine.rb @ line 516:Owner: #<Class:EventMachine>
def self.start_server server, port=nil, handler=nil, *args, &block port = Integer(port) klass = klass_from_handler(Connection, handler, *args)
s = if port start_tcp_server server, port else start_unix_server server end @acceptors[s] = [klass, args, block] send
复制代码


该方法其实使我们在使用 EventMachine 时常见的接口,只要我们想要启动一个新的 TCP 或者 UNIX 服务器,就可以上述方法,在这里会根据端口号是否存在,选择执行 .start_tcp_server 或者 .start_unix_server 创建一个新的 Socket 并存储在 @acceptors 中:


Ruby


From: lib/em/pure_ruby.rb @ line 184:Owner: #<Class:EventMachine>
def self.start_tcp_server host, port (s = EvmaTCPServer.start_server host, port) or raise "no acceptor" s.uuidend
复制代码


EventMachine.start_tcp_server 在这里也只做了个『转发』方法的作用的,直接调用 EvmaTCPServer.start_server 创建一个新的 Socket 对象并绑定到传入的 <host, port> 上:


Ruby


From: lib/em/pure_ruby.rb @ line 1108:Owner: #<Class:EventMachine::EvmaTCPServer>
def self.start_server host, port sd = Socket.new( Socket::AF_LOCAL, Socket::SOCK_STREAM, 0 ) sd.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEADDR, true ) sd.bind( Socket.pack_sockaddr_in( port, host )) sd.listen( 50 ) # 5 is what you see in all the books. Ain't enough. EvmaTCPServer.new sdend
复制代码


方法的最后会创建一个新的 EvmaTCPServer 实例的过程中,我们需要通过 #fcntl 将 Socket 变成非阻塞式的:


Ruby


From: lib/em/pure_ruby.rb @ line 687:Owner: EventMachine::Selectable
def initialize io @io = io @uuid = UuidGenerator.generate @is_server = false @last_activity = Reactor.instance.current_loop_time
m = @io.fcntl(Fcntl::F_GETFL, 0) @io.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK | m)
@close_scheduled = false @close_requested = false
se = self; @io.instance_eval { @my_selectable = se } Reactor.instance.add_selectable @ioend
复制代码


不只是 EvmaTCPServer,所有的 Selectable 子类在初始化的最后都会将新的 Socket 以 uuid 为键存储到 Reactor 单例对象的 @selectables 字典中:


Ruby


From: lib/em/pure_ruby.rb @ line 532:Owner: EventMachine::Reactor
def add_selectable io @selectables[io.uuid] = ioend
复制代码


在整个事件循环的大循环中,这里存入的所有 Socket 都会被 #select 方法监听,在响应的事件发生时交给合适的回调处理,作者在 Redis 中的事件循环 一文中也介绍过非常相似的处理过程。



所有的 Socket 都会存储在一个 @selectables 的哈希中并由 #select 方法监听所有的读写事件,一旦相应的事件触发就会通过 eventable_read 或者 eventable_write 方法来响应该事件。

处理读写事件

所有的读写事件都是通过 Selectable 和它的子类来处理的,在 EventMachine 中,总共有以下的几种子类:



所有处理服务端读写事件的都是 Selectable 的子类,也就是 EvmaTCPServerEvmaUNIXServer,而所有处理客户端读写事件的都是 StreamObject 的子类 EvmaTCPServerEvmaUNIXClient


当我们初始化的绑定在 <host, port> 上的 Socket 对象监听到了来自用户的 TCP 请求时,当前的 Socket 就会变得可读,事件循环中的 #select 方法就会调用 EvmaTCPClient#eventable_read 通知由一个请求需要处理:


Ruby


From: lib/em/pure_ruby.rb @ line 1130:Owner: EventMachine::EvmaTCPServer
def eventable_read begin 10.times { descriptor, peername = io.accept_nonblock sd = EvmaTCPClient.new descriptor sd.is_server = true EventMachine::event_callback uuid, ConnectionAccepted, sd.uuid } rescue Errno::EWOULDBLOCK, Errno::EAGAIN endend
复制代码


在这里会尝试多次 #accept_non_block 当前的 Socket 并会创建一个 TCP 的客户端对象 EvmaTCPClient,同时通过 .event_callback 方法发送 ConnectionAccepted 消息。


EventMachine::event_callback 就像是一个用于处理所有事件的中心方法,所有的回调都要通过这个中继器进行调度,在实现上就是一个庞大的 ifelse 语句,里面处理了 EventMachine 中可能出现的 10 种状态和操作:



大多数事件在触发时,都会从 @conns 中取出相应的 Connection 对象,最后执行合适的方法来处理,而这里触发的 ConnectionAccepted 事件是通过以下的代码来处理的:


Ruby


From: lib/eventmachine.rb @ line 1462:Owner: #<Class:EventMachine>
def self.event_callback conn_binding, opcode, data if opcode == # ... # ... elsif opcode == ConnectionAccepted accep, args, blk = @acceptors[conn_binding] raise NoHandlerForAcceptedConnection unless accep c = accep.new data, *args @conns[data] = c blk and blk.call(c) c else # ... endend
复制代码


上述的 accep 变量就是我们在 Thin 调用 .start_server 时传入的 Connection 类,在这里我们初始化了一个新的实例,同时以 Socket 的 uuid 作为键存到 @conns 中。


在这之后 #select 方法就会监听更多 Socket 上的事件了,当这个 “accept” 后创建的 Socket 接收到数据时,就会触发下面的 #eventable_read 方法:


Ruby


From: lib/em/pure_ruby.rb @ line 1130:Owner: EventMachine::StreamObject
def eventable_read @last_activity = Reactor.instance.current_loop_time begin if io.respond_to?(:read_nonblock) 10.times { data = io.read_nonblock(4096) EventMachine::event_callback uuid, ConnectionData, data } else data = io.sysread(4096) EventMachine::event_callback uuid, ConnectionData, data end rescue Errno::EAGAIN, Errno::EWOULDBLOCK, SSLConnectionWaitReadable rescue Errno::ECONNRESET, Errno::ECONNREFUSED, EOFError, Errno::EPIPE, OpenSSL::SSL::SSLError @close_scheduled = true EventMachine::event_callback uuid, ConnectionUnbound, nil endend
复制代码


方法会从 Socket 中读取数据并通过 .event_callback 发送 ConnectionData 事件:


Ruby


From: lib/eventmachine.rb @ line 1462:Owner: #<Class:EventMachine>
def self.event_callback conn_binding, opcode, data if opcode == # ... # ... elsif opcode == ConnectionData c = @conns[conn_binding] or raise ConnectionNotBound, "received data #{data} for unknown signature: #{conn_binding}" c.receive_data data else # ... endend
复制代码


从上述方法对 ConnectionData 事件的处理就可以看到通过传入 Socket 的 uuid 和数据,就可以找到上面初始化的 Connection 对象,#receive_data 方法就能够将数据传递到上层,让用户在自定义的 Connection 中实现自己的处理逻辑,这也就是 Thin 需要覆写 #receive_data 方法来接受数据的原因了。


当 Thin 以及 Rack 应用已经接收到了来自用户的请求、完成处理并返回之后经过一系列复杂的调用栈就会执行 Connection#send_data 方法:


Ruby


From: lib/em/connection.rb @ line 324:Owner: EventMachine::Connection
def send_data data data = data.to_s size = data.bytesize if data.respond_to?(:bytesize) size ||= data.size EventMachine::send_data @signature, data, sizeend
From: lib/em/pure_ruby.rb @ line 172:Owner: #<Class:EventMachine>
def self.send_data target, data, datalength selectable = Reactor.instance.get_selectable( target ) or raise "unknown send_data target" selectable.send_data dataend
From: lib/em/pure_ruby.rb @ line 851:Owner: EventMachine::StreamObject
def send_data data unless @close_scheduled or @close_requested or !data or data.length <= 0 @outbound_q << data.to_s endend
复制代码


经过一系列同名方法的调用,在调用栈末尾的 StreamObject#send_data 中,将所有需要写入的数据全部加入 @outbound_q 中,这其实就是一个待写入数据的队列。


当 Socket 变得可写之后,就会由 #select 方法触发 #eventable_write@outbound_q 队列中的数据通过 #write_nonblock 或者 syswrite 写入 Socket,也就是将请求返回给客户端。


Ruby


From: lib/em/pure_ruby.rb @ line 823:Owner: EventMachine::StreamObject
def eventable_write @last_activity = Reactor.instance.current_loop_time while data = @outbound_q.shift do begin data = data.to_s w = if io.respond_to?(:write_nonblock) io.write_nonblock data else io.syswrite data end
if w < data.length @outbound_q.unshift data[w..-1] break end rescue Errno::EAGAIN, SSLConnectionWaitReadable, SSLConnectionWaitWritable @outbound_q.unshift data break rescue EOFError, Errno::ECONNRESET, Errno::ECONNREFUSED, Errno::EPIPE, OpenSSL::SSL::SSLError @close_scheduled = true @outbound_q.clear end endend
复制代码

关闭 Socket

当数据写入时发生了 EOFError 或者其他错误时就会将 close_scheduled 标记为 true,在随后的事件循环中会关闭 Socket 并发送 ConnectionUnbound 事件:


Ruby


From: lib/em/pure_ruby.rb @ line 540:Owner: EventMachine::Reactor
def crank_selectables # ...
@selectables.delete_if {|k,io| if io.close_scheduled? io.close begin EventMachine::event_callback io.uuid, ConnectionUnbound, nil rescue ConnectionNotBound; end true end }end
复制代码


.event_callback 在处理 ConnectionUnbound 事件时会在 @conns 中将结束的 Connection 剔除:


Ruby


def self.event_callback conn_binding, opcode, data  if opcode == ConnectionUnbound    if c = @conns.delete( conn_binding )      c.unbind      io = c.instance_variable_get(:@io)      begin        io.close      rescue Errno::EBADF, IOError      end    elsif c = @acceptors.delete( conn_binding )    else      raise ConnectionNotBound, "received ConnectionUnbound for an unknown signature: #{conn_binding}"    end  elsif opcode = 1    #...  endend
复制代码


在这之后会调用 Connection#unbind 方法,再次执行 #close 确保 Socket 连接已经断掉了。

小结

EventMachine 在处理用户的请求时,会通过一个事件循环和一个中心化的事件处理中心 .event_callback 来响应所有的事件,你可以看到在使用 EventMachine 时所有的响应都是异步的,尤其是对 Socket 的读写,所有外部的输入在 EventMachine 看来都是一个事件,它们会被 EventMachine 选择合适的处理器进行转发。

I/O 模型

Thin 本身其实没有实现任何的 I/O 模型,它通过对 EventMachine 进行封装,使用了其事件驱动的特点,为上层提供了处理并发 I/O 的 Reactor 模型,在不同的阶段有着不同的工作流程,在启动 Thin 的服务时,Thin 会直接通过 .start_server 创建一个 Socket 监听一个 <host, port> 组成的元组:



当服务启动之后,就可以接受来自客户端的 HTTP 请求了,处理 HTTP 请求总共需要三个模块的合作,分别是 EventMachine、Thin 以及 Rack 应用:



在上图中省略了 Rack 的处理部分,不过对于其他部分的展示还是比较详细的,EventMachine 负责对 TCP Socket 进行监听,在发生事件时通过 .event_callback 进行处理,将消息转发给位于 Thin 中的 Connection,该类以及模块负责处理 HTTP 协议相关的内容,将整个请求包装成一个 env 对象,调用 #call 方法。


在这时就开始了返回响应的逻辑了,#call 方法会返回一个三元组,经过 Thin 中的 #send_data 最终将数据写入 outbound_q 队列中等待处理:



EventMachine 会通过一个事件循环,使用 #select 监听当前 Socket 的可读写状态,并在合适的时候触发 #eventable_writeoutbound_q 队列中读取数据写入 Socket,在写入结束后 Socket 就会被关闭,整个请求的响应也就结束了。



Thin 使用了 EventMachine 作为底层处理 TCP 协议的框架,提供了事件驱动的 I/O 模型,也就是我们理解的 Reactor 模型,对于每一个 HTTP 请求都会创建一个对应的 Connection 对象,所有的事件都由 EventMachine 来派发,最大程度做到了 I/O 的读写都是异步的,不会阻塞当前的线程,这也是 Thin 以及 Node.js 能够并发处理大量请求的原因。

总结

Thin 作为一个 Ruby 社区中简单的 webserver,其实本身没有做太多的事情,只是使用了 EventMachine 提供的事件驱动的 I/O 模型,为上层提供了更加易用的 API,相比于其他同步处理请求的 webserver,Reactor 模式的优点就是 Thin 的优点,主程序只负责监听事件和分发事件,一旦涉及到 I/O 的工作都尽量使用回调的方式处理,当回调完成后再发送通知,这种方式能够减少进程的等待时间,时刻都在处理用户的请求和事件。

相关文章


本文转载自 Draveness 技术博客。


原文链接:https://draveness.me/rack-thin


2019-12-05 18:14580

评论

发布
暂无评论
发现更多内容

《垃圾回收的算法与实现》.pdf

田维常

垃圾回收

区块链在债券市场如何应用

CECBC

区块链 债券

胡继晔:中国应建区块链行业准入制度

CECBC

区块链 金融 数字经济

天啊!怎么会有人把Spring Cloud微服务架构讲得这么透彻?

Java架构之路

Java 程序员 架构 面试 编程语言

KubeVela 正式开源:一个高可扩展的云原生应用平台与核心引擎

阿里巴巴云原生

阿里云 开源 Kubernetes 云原生 OAM

架构师Week5作业

lggl

作业

一次 Java 进程 OOM 的排查分析(glibc 篇)

996小迁

Java 编程 架构 面试 计算机

分布式事务太繁琐?官方推荐Atomikos,5分钟帮你搞定

互联网应用架构

分布式事务 springboot

开个交易所需要多少费用?数字货币交易所搭建

13530558032

《Python程序员面试算法宝典》PDF 超清版免费领取

计算机与AI

Python 面试 算法

架构师Week5总结

lggl

总结

太赞了!腾讯T3-3架构师整理了5000页的Java学习手册免费开放下载

Java架构之路

Java 程序员 架构 面试 编程语言

区块链数字货币钱包源码价格,区块链多币种钱包

13530558032

基于SpringBoot、SpringCloud、Docker微服务架构实战,资源分享

Java架构之路

Java 程序员 架构 面试 编程语言

大四女学霸社招竟成功签约字节跳动,拿下30万年薪?

Java架构师迁哥

JVM入门,认识Class文件

Simon郎

JVM Java 分布式

SQL数据库:GROUPING运算符

正向成长

GROUPING运算符

云原生2.0时代下,DevOps实践如何才能更加高效敏捷?

华为云开发者联盟

云计算 数字化 华为云

高性能利器!华为云MRS ClickHouse重磅推出!

华为云开发者联盟

数据库 Clickhouse MRS

【涂鸦物联网足迹】涂鸦云平台消息服务—顺带Pulsar简单介绍

IoT云工坊

人工智能 物联网 云服务 Apache Pulsar 云平台

云算力矿机源码价格,区块链挖矿平台开发

13530558032

小学妹问我:如何利用可视化工具排查问题?

田维常

可视化

区块链,音乐,流媒体和版税

CECBC

区块链 艺术

Forrester 最新报告:阿里云稳居领导者地位,引领云原生开发浪潮

阿里巴巴云原生

阿里云 Serverless Kubernetes 容器 云原生

LAXCUS大数据集群操作系统挖矿

陈泽云

大数据 分布式计算 挖矿

年轻人不讲武德不仅白piao接口测试知识还白piao接口测试工具会员

测试人生路

接口测试

SpringBoot:整合Swagger3.0与RESTful接口整合返回值(2020最新最易懂)

比伯

Java 编程 架构 面试 计算机

收藏!数据建模最全知识体系解读

华为云开发者联盟

数据仓库 数据 数据建模

某美团程序员爆料:筛选简历时,用go语言的基本不看!网友:当韭菜还当出优越感了!

Java架构师迁哥

【云图说】第189期 初识数据仓库服务

华为云开发者联盟

数据库 数据仓库 数据

《迅雷链精品课》第五课:账户与账本

迅雷链

区块链

浅谈 Thin 的事件驱动模型_文化 & 方法_Draveness_InfoQ精选文章