Skip to content

Commit

Permalink
Connection API modified, unsubscribe added
Browse files Browse the repository at this point in the history
  • Loading branch information
arvicco committed Oct 28, 2011
1 parent 3276a04 commit 8522697
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 47 deletions.
4 changes: 2 additions & 2 deletions bin/account_info
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ ib = IB::Connection.new
ib.subscribe(:Alert, :AccountValue,
:PortfolioValue, :AccountUpdateTime) { |msg| puts msg.to_human }

ib.send :RequestAccountData, :subscribe => true
ib.send_message :RequestAccountData, :subscribe => true

puts "\nSubscribing to IB account data"
puts "\n******** Press <Enter> to cancel... *********\n\n"
gets
puts "Cancelling account data subscription.."

ib.send :RequestAccountData, :subscribe => false
ib.send_message :RequestAccountData, :subscribe => false
2 changes: 1 addition & 1 deletion bin/contract_details
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ ib.subscribe(:ContractData) { |msg| puts msg.contract.inspect }
# Now we actually request historical data for the symbols we're interested in. TWS will
# respond with a HistoricalData message, which will be processed by the code above.
@market.each_pair do |id, contract|
ib.send :RequestContractData, :id => id, :contract => contract
ib.send_message :RequestContractData, :id => id, :contract => contract
end

sleep 3 # Wait for IB to respond to our request
2 changes: 1 addition & 1 deletion bin/depth_of_market
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ end

# Now we actually request market data for the symbols we're interested in.
@market.each_pair do |id, contract|
ib.send :RequestMarketDepth, :id => id, :contract => contract, :num_rows => 5
ib.send_message :RequestMarketDepth, :id => id, :contract => contract, :num_rows => 5
end

puts "\nSubscribed to market data"
Expand Down
18 changes: 9 additions & 9 deletions bin/historic_data
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ end
# Now we actually request historical data for the symbols we're interested in. TWS will
# respond with a HistoricalData message, which will be processed by the code above.
@market.each_pair do |id, contract|
ib.send IB::Messages::Outgoing::RequestHistoricalData.new(
:id => id,
:contract => contract,
:end_date_time => Time.now.to_ib,
:duration => '2 D', # ?
:bar_size => '1 min', #IB::OutgoingMessages::BAR_SIZES.key(:hour),
:what_to_show => :trades,
:use_rth => 1,
:format_date => 1)
ib.send_message IB::Messages::Outgoing::RequestHistoricalData.new(
:id => id,
:contract => contract,
:end_date_time => Time.now.to_ib,
:duration => '2 D', # ?
:bar_size => '1 min', #IB::OutgoingMessages::BAR_SIZES.key(:hour),
:what_to_show => :trades,
:use_rth => 1,
:format_date => 1)
end

sleep 5 # Wait for IB to respond to our request
2 changes: 1 addition & 1 deletion bin/historic_data_cli
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ ib.subscribe(IB::IncomingMessages::HistoricalData, lambda { |msg|
:use_RTH => REGULAR_HOURS_ONLY,
:format_date => DATE_FORMAT
})
ib.dispatch(msg)
ib.send_message(msg)
}


Expand Down
4 changes: 2 additions & 2 deletions bin/market_data
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ end

# Now we actually request market data for the symbols we're interested in.
@market.each_pair do |id, contract|
ib.send :RequestMarketData, :id => id, :contract => contract
ib.send_message :RequestMarketData, :id => id, :contract => contract
end

puts "\nSubscribed to market data"
puts "\n******** Press <Enter> to cancel... *********\n\n"
gets
puts "Cancelling market data subscription.."

@market.each_pair { |id, _| ib.send :CancelMarketData, :id => id }
@market.each_pair { |id, _| ib.send_message :CancelMarketData, :id => id }
4 changes: 2 additions & 2 deletions bin/option_data
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ end

# Now we actually request market data for the symbols we're interested in.
@market.each_pair do |id, contract|
ib.send :RequestMarketData, :id => id, :contract => contract
ib.send_message :RequestMarketData, :id => id, :contract => contract
end

puts "\nSubscribed to market data"
puts "\n******** Press <Enter> to cancel... *********\n\n"
gets
puts "Cancelling market data subscription.."

@market.each_pair { |id, contract| ib.send :CancelMarketData, :id => id }
@market.each_pair { |id, contract| ib.send_message :CancelMarketData, :id => id }
2 changes: 1 addition & 1 deletion bin/template
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require 'ib-ruby'
ib = IB::Connection.new

# Subscribe to TWS alerts/errors
ib.subscribe(IB::Messages::Incoming::Alert) { |msg| puts msg.to_human }
ib.subscribe(:Alert) { |msg| puts msg.to_human }

# Put your code here
# ...
Expand Down
2 changes: 1 addition & 1 deletion bin/time_and_sales
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ ib.subscribe(:TickPrice, :TickSize, :TickString) { |msg| show_sales_and_size(msg
# Now we actually request market data for the symbols we're interested in.

@market.each_pair do |id, contract|
ib.send :RequestMarketData, :id => id, :contract => contract
ib.send_message :RequestMarketData, :id => id, :contract => contract
end

puts "\nSubscribed to TWS market data"
Expand Down
55 changes: 31 additions & 24 deletions lib/ib-ruby/connection.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
require 'ib-ruby/socket'
require 'logger'

if RUBY_VERSION < "1.9"
require 'sha1'
else
require 'digest/sha1'
include Digest
end

# Add method to_ib to render datetime in IB format (zero padded "yyyymmdd HH:mm:ss")
class Time
def to_ib
Expand Down Expand Up @@ -41,15 +34,17 @@ def initialize(opts = {})
@next_order_id = nil
@server = Hash.new

# Message listeners. Key is the message class to listen for.
# Value is an Array of Procs. The proc will be called with the populated message
# instance as its argument when a message of that type is received.
# TODO: change Array of Procs into a Hash to allow unsubscribing
@listeners = Hash.new { |hash, key| hash[key] = Array.new }

self.open(@options) if @options[:open]
end

# Message subscribers. Key is the message class to listen for.
# Value is a Hash of subscriber Procs, keyed by their subscription id.
# All subscriber Procs will be called with the message instance
# as an argument when a message of that type is received.
def subscribers
@subscribers ||= Hash.new { |hash, key| hash[key] = Hash.new }
end

def open(opts = {})
raise Exception.new("Already connected!") if @connected

Expand All @@ -59,6 +54,7 @@ def open(opts = {})
self.subscribe(:NextValidID) do |msg|
@next_order_id = msg.data[:id]
puts "Got next valid order id: #{@next_order_id}."
#p self
end

@server[:socket] = IBSocket.open(opts[:host], opts[:port])
Expand Down Expand Up @@ -93,16 +89,18 @@ def close
@connected = false
end

def to_s
"IB Connection: #{ @connected ? "connected." : "disconnected."}"
def connected?
@connected
end

# Subscribe listener to specific type(s) of incoming message events.
# Subscribe Proc or block to specific type(s) of incoming message events.
# Listener will be called later with received message instance as its argument.
# Returns subscriber id to allow unsubscribing
def subscribe(*args, &block)
listener = args.last.respond_to?(:call) ? args.pop : block
subscriber = args.last.respond_to?(:call) ? args.pop : block
subscriber_id = random_id

raise ArgumentError.new "Need listener proc or block" unless listener.is_a? Proc
raise ArgumentError.new "Need subscriber proc or block" unless subscriber.is_a? Proc

args.each do |what|
message_class =
Expand All @@ -115,12 +113,21 @@ def subscribe(*args, &block)
raise ArgumentError.new "#{what} must represent incoming IB message class"
end

@listeners[message_class].push(listener)
subscribers[message_class][subscriber_id] = subscriber
end
subscriber_id
end

# Remove subscriber(s) with specific subscriber id
def unsubscribe(subscriber_id)

subscribers.each do |message_class, message_subscribers|
message_subscribers.delete subscriber_id
end
end

# Send an outgoing message.
def send(what, *args)
def send_message(what, *args)
message =
case
when what.is_a?(Messages::Outgoing::AbstractMessage)
Expand All @@ -132,13 +139,13 @@ def send(what, *args)
else
raise ArgumentError.new("Only able to send Messages::Outgoing")
end
message.send(@server)
message.send_to(@server)
end

protected

def random_id
SHA1.digest(Time.now.to_s + $$.to_s).unpack("C*").join.to_i % 999999999
rand 999999999
end

def reader
Expand All @@ -155,8 +162,8 @@ def reader
# NB: Failure here usually means unsupported message type received
msg = Messages::Incoming::Table[msg_id].new(@server[:socket])

@listeners[msg.class].each { |listener| listener.call(msg) }
puts "No listeners for incoming message #{msg.class}!" if @listeners[msg.class].empty?
subscribers[msg.class].each { |_, subscriber| subscriber.call(msg) }
puts "No subscribers for incoming message #{msg.class}!" if subscribers[msg.class].empty?
end # loop
end # reader

Expand Down
4 changes: 1 addition & 3 deletions lib/ib-ruby/messages/outgoing.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,12 @@ def to_human
# an Array of elements that ought to be sent to the server by calling to_s on
# each one and postpending a '\0'.
#
def send(server)
def send_to(server)
self.encode.flatten.each do |datum|
# TWS wants to receive booleans as 1 or 0... rewrite as necessary.
datum = "1" if datum == true
datum = "0" if datum == false

#print 'SENDING: '
#p datum
server[:socket].syswrite(datum.to_s + EOL)
end
end
Expand Down
56 changes: 56 additions & 0 deletions spec/ib-ruby/connection_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
require File.join(File.dirname(__FILE__), %w[.. spec_helper])

describe IB::Connection do

context "new" do
context 'connected by default' do
# THIS depends on TWS|Gateway connectivity
before(:all) { @ib = IB::Connection.new }
after(:all) { @ib.close }
subject { @ib }

it { should_not be_nil }
it { should be_connected }
its (:server) {should be_a Hash}
its (:subscribers) {should have_at_least(1).listener} # :NextValidID and empty Hashes
its (:next_order_id) {should be_a Fixnum} # Not before :NextValidID arrives
end

context 'passing :open => false' do
subject { IB::Connection.new :open => false }

it { should_not be_nil }
it { should_not be_connected }
its (:server) {should be_a Hash}
its (:subscribers) {should be_empty}
its (:next_order_id) {should be_nil}
end
end #instantiation

context "subscriptions" do
# THIS depends on TWS|Gateway connectivity
before(:all) { @ib = IB::Connection.new }
after(:all) { @ib.close }

it 'adds (multiple) subscribers' do
@subscriber_id = @ib.subscribe(:Alert, :AccountValue) do
puts "oooooooooo"
end

@ib.subscribers.should have_key(IB::Messages::Incoming::Alert)
@ib.subscribers.should have_key(IB::Messages::Incoming::AccountValue)
@ib.subscribers[IB::Messages::Incoming::Alert].should have_key(@subscriber_id)
@ib.subscribers[IB::Messages::Incoming::AccountValue].should have_key(@subscriber_id)
@ib.subscribers[IB::Messages::Incoming::Alert][@subscriber_id].should be_a Proc
@ib.subscribers[IB::Messages::Incoming::AccountValue][@subscriber_id].should be_a Proc
end

it 'removes all subscribers' do
@subscriber_id = @ib.unsubscribe(@subscriber_id)

@ib.subscribers[IB::Messages::Incoming::Alert].should_not have_key(@subscriber_id)
@ib.subscribers[IB::Messages::Incoming::AccountValue].should_not have_key(@subscriber_id)
end

end #subscriptions
end # describe IB::Connection

0 comments on commit 8522697

Please sign in to comment.