Skip to content

Commit

Permalink
Fix options in EM::Synchrony adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
hakanensari committed May 2, 2012
1 parent add19ff commit 371ad26
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 92 deletions.
126 changes: 65 additions & 61 deletions lib/faraday/adapter/em_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,71 @@ class Adapter
# when in EM reactor loop or for making parallel requests in
# synchronous code.
class EMHttp < Faraday::Adapter
module Options
def connection_config(env)
options = {}
configure_ssl(options, env)
configure_proxy(options, env)
configure_timeout(options, env)
configure_socket(options, env)
options
end

def request_config(env)
options = {
:body => read_body(env),
:head => env[:request_headers],
# :keepalive => true,
# :file => 'path/to/file', # stream data off disk
}
configure_compression(options, env)
# configure_proxy_auth
# :proxy => {:authorization => [user, pass]}
# proxy[:username] && proxy[:password]
options
end

def read_body(env)
body = env[:body]
body.respond_to?(:read) ? body.read : body
end

def configure_ssl(options, env)
if ssl = env[:ssl]
# :ssl => {
# :private_key_file => '/tmp/server.key',
# :cert_chain_file => '/tmp/server.crt',
# :verify_peer => false
end
end

def configure_proxy(options, env)
if proxy = request_options(env)[:proxy]
options[:proxy] = {
:host => proxy[:uri].host,
:port => proxy[:uri].port
}
end
end

def configure_timeout(options, env)
timeout, open_timeout = request_options(env).values_at(:timeout, :open_timeout)
options[:connect_timeout] = options[:inactivity_timeout] = timeout
options[:connect_timeout] = open_timeout if open_timeout
end

def configure_compression(options, env)
if env[:method] == :get and not options[:head].key? 'accept-encoding'
options[:head]['accept-encoding'] = 'gzip, compressed'
end
end

def request_options(env)
env[:request]
end
end

include Options

dependency 'em-http'

Expand Down Expand Up @@ -80,67 +145,6 @@ def raise_error(msg)
raise errklass, msg
end

def connection_config(env)
options = {}
configure_ssl(options, env)
configure_proxy(options, env)
configure_timeout(options, env)
options
end

def request_config(env)
options = {
:body => read_body(env),
:head => env[:request_headers],
# :keepalive => true,
# :file => 'path/to/file', # stream data off disk
}
configure_compression(options, env)
# configure_proxy_auth
# :proxy => {:authorization => [user, pass]}
# proxy[:username] && proxy[:password]
options
end

def read_body(env)
body = env[:body]
body.respond_to?(:read) ? body.read : body
end

def configure_ssl(options, env)
if ssl = env[:ssl]
# :ssl => {
# :private_key_file => '/tmp/server.key',
# :cert_chain_file => '/tmp/server.crt',
# :verify_peer => false
end
end

def configure_proxy(options, env)
if proxy = request_options(env)[:proxy]
options[:proxy] = {
:host => proxy[:uri].host,
:port => proxy[:uri].port
}
end
end

def configure_timeout(options, env)
timeout, open_timeout = request_options(env).values_at(:timeout, :open_timeout)
options[:connect_timeout] = options[:inactivity_timeout] = timeout
options[:connect_timeout] = open_timeout if open_timeout
end

def configure_compression(options, env)
if env[:method] == :get and not options[:head].key? 'accept-encoding'
options[:head]['accept-encoding'] = 'gzip, compressed'
end
end

def request_options(env)
env[:request]
end

def parallel?(env)
!!env[:parallel_manager]
end
Expand Down
35 changes: 4 additions & 31 deletions lib/faraday/adapter/em_synchrony.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
module Faraday
class Adapter
class EMSynchrony < Faraday::Adapter
include EMHttp::Options

dependency do
require 'em-synchrony/em-http'
Expand All @@ -18,41 +19,13 @@ def self.setup_parallel_manager(options = {})

def call(env)
super
request = EventMachine::HttpRequest.new(URI::parse(env[:url].to_s))
options = {:head => env[:request_headers]}
options[:ssl] = env[:ssl] if env[:ssl]

if env[:body]
if env[:body].respond_to? :read
options[:body] = env[:body].read
else
options[:body] = env[:body]
end
end

if req = env[:request]
if proxy = req[:proxy]
uri = URI.parse(proxy[:uri])
options[:proxy] = {
:host => uri.host,
:port => uri.port
}
if proxy[:username] && proxy[:password]
options[:proxy][:authorization] = [proxy[:username], proxy[:password]]
end
end

# only one timeout currently supported by em http request
if req[:timeout] or req[:open_timeout]
options[:timeout] = [req[:timeout] || 0, req[:open_timeout] || 0].max
end
end
request = EventMachine::HttpRequest.new(URI::parse(env[:url].to_s), connection_config(env)) # end

http_method = env[:method].to_s.downcase.to_sym

# Queue requests for parallel execution.
if env[:parallel_manager]
env[:parallel_manager].add(request, http_method, options) do |resp|
env[:parallel_manager].add(request, http_method, request_config(env)) do |resp|
save_response(env, resp.response_header.status, resp.response) do |resp_headers|
resp.response_header.each do |name, value|
resp_headers[name.to_sym] = value
Expand All @@ -66,7 +39,7 @@ def call(env)
# Execute single request.
else
client = nil
block = lambda { request.send(http_method, options) }
block = lambda { request.send(http_method, request_config(env)) }

if !EM.reactor_running?
EM.run do
Expand Down

0 comments on commit 371ad26

Please sign in to comment.