Skip to content

Commit

Permalink
Merge pull request rails#98 from rails/subscription-confirmation
Browse files Browse the repository at this point in the history
Send subscription confirmation from server to the client to avoid race conditions
  • Loading branch information
lifo committed Oct 19, 2015
2 parents df5a32d + 60748de commit 0d99cfd
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 18 deletions.
22 changes: 22 additions & 0 deletions lib/action_cable/channel/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class Base
include Naming
include Broadcasting

SUBSCRIPTION_CONFIRMATION_INTERNAL_MESSAGE = 'confirm_subscription'.freeze

on_subscribe :subscribed
on_unsubscribe :unsubscribed

Expand Down Expand Up @@ -120,6 +122,10 @@ def initialize(connection, identifier, params = {})
@identifier = identifier
@params = params

# When a channel is streaming via redis pubsub, we want to delay the confirmation
# transmission until redis pubsub subscription is confirmed.
@defer_subscription_confirmation = false

delegate_connection_identifiers
subscribe_to_channel
end
Expand Down Expand Up @@ -165,6 +171,15 @@ def transmit(data, via: nil)
end


protected
def defer_subscription_confirmation!
@defer_subscription_confirmation = true
end

def defer_subscription_confirmation?
@defer_subscription_confirmation
end

private
def delegate_connection_identifiers
connection.identifiers.each do |identifier|
Expand All @@ -177,6 +192,7 @@ def delegate_connection_identifiers

def subscribe_to_channel
run_subscribe_callbacks
transmit_subscription_confirmation unless defer_subscription_confirmation?
end


Expand Down Expand Up @@ -213,6 +229,12 @@ def run_subscribe_callbacks
def run_unsubscribe_callbacks
self.class.on_unsubscribe_callbacks.each { |callback| send(callback) }
end

def transmit_subscription_confirmation
logger.info "#{self.class.name} is transmitting the subscription confirmation"
connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: SUBSCRIPTION_CONFIRMATION_INTERNAL_MESSAGE)
end

end
end
end
12 changes: 9 additions & 3 deletions lib/action_cable/channel/streams.rb
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,18 @@ module Streams
# Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used
# instead of the default of just transmitting the updates straight to the subscriber.
def stream_from(broadcasting, callback = nil)
callback ||= default_stream_callback(broadcasting)
# Hold off the confirmation until pubsub#subscribe is successful
defer_subscription_confirmation!

callback ||= default_stream_callback(broadcasting)
streams << [ broadcasting, callback ]
EM.next_tick { pubsub.subscribe broadcasting, &callback }

logger.info "#{self.class.name} is streaming from #{broadcasting}"
EM.next_tick do
pubsub.subscribe(broadcasting, &callback).callback do |reply|
transmit_subscription_confirmation
logger.info "#{self.class.name} is streaming from #{broadcasting}"
end
end
end

# Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a
Expand Down
2 changes: 2 additions & 0 deletions lib/assets/javascripts/cable.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

@Cable =
PING_IDENTIFIER: "_ping"
INTERNAL_MESSAGES:
SUBSCRIPTION_CONFIRMATION: 'confirm_subscription'

createConsumer: (url) ->
new Cable.Consumer url
10 changes: 8 additions & 2 deletions lib/assets/javascripts/cable/connection.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,14 @@ class Cable.Connection

events:
message: (event) ->
{identifier, message} = JSON.parse(event.data)
@consumer.subscriptions.notify(identifier, "received", message)
{identifier, message, type} = JSON.parse(event.data)

if type?
switch type
when Cable.INTERNAL_MESSAGES.SUBSCRIPTION_CONFIRMATION
@consumer.subscriptions.notify(identifier, "connected")
else
@consumer.subscriptions.notify(identifier, "received", message)

open: ->
@disconnected = false
Expand Down
6 changes: 2 additions & 4 deletions lib/assets/javascripts/cable/subscriptions.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@ class Cable.Subscriptions
add: (subscription) ->
@subscriptions.push(subscription)
@notify(subscription, "initialized")
if @sendCommand(subscription, "subscribe")
@notify(subscription, "connected")
@sendCommand(subscription, "subscribe")

reload: ->
for subscription in @subscriptions
if @sendCommand(subscription, "subscribe")
@notify(subscription, "connected")
@sendCommand(subscription, "subscribe")

remove: (subscription) ->
@subscriptions = (s for s in @subscriptions when s isnt subscription)
Expand Down
6 changes: 6 additions & 0 deletions test/channel/base_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,10 @@ def rm_rf
expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "message" => { "data" => "latest" }
assert_equal expected, @connection.last_transmission
end

test "subscription confirmation" do
expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription"
assert_equal expected, @connection.last_transmission
end

end
35 changes: 26 additions & 9 deletions test/channel/stream_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,45 @@ def subscribed
end
end

setup do
@connection = TestConnection.new
end

test "streaming start and stop" do
run_in_eventmachine do
@connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1").returns stub_everything(:pubsub) }
channel = ChatChannel.new @connection, "{id: 1}", { id: 1 }
connection = TestConnection.new
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1").returns stub_everything(:pubsub) }
channel = ChatChannel.new connection, "{id: 1}", { id: 1 }

@connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe_proc) }
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe_proc) }
channel.unsubscribe_from_channel
end
end

test "stream_for" do
run_in_eventmachine do
connection = TestConnection.new
EM.next_tick do
@connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire").returns stub_everything(:pubsub) }
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire").returns stub_everything(:pubsub) }
end

channel = ChatChannel.new @connection, ""
channel = ChatChannel.new connection, ""
channel.stream_for Room.new(1)
end
end

test "stream_from subscription confirmation" do
EM.run do
connection = TestConnection.new
connection.expects(:pubsub).returns EM::Hiredis.connect.pubsub

channel = ChatChannel.new connection, "{id: 1}", { id: 1 }
assert_nil connection.last_transmission

EM::Timer.new(0.1) do
expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription"
assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s"

EM.run_deferred_callbacks
EM.stop
end
end
end

end
1 change: 1 addition & 0 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
Bundler.require :default, :test

require 'puma'
require 'em-hiredis'
require 'mocha/mini_test'

require 'rack/mock'
Expand Down

0 comments on commit 0d99cfd

Please sign in to comment.