Skip to content

Commit

Permalink
Merge pull request #114 from rage-rb/cable-redis-adapter
Browse files Browse the repository at this point in the history
[Cable] Redis adapter
  • Loading branch information
rsamoilov authored Jan 19, 2025
2 parents 1634081 + 2d2bae1 commit f82f63a
Show file tree
Hide file tree
Showing 14 changed files with 533 additions and 15 deletions.
1 change: 1 addition & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ jobs:
TEST_HTTP_URL: ${{ secrets.TEST_HTTP_URL }}
TEST_PG_URL: ${{ secrets.TEST_PG_URL }}
TEST_MYSQL_URL: ${{ secrets.TEST_MYSQL_URL }}
TEST_REDIS_URL: ${{ secrets.TEST_REDIS_URL }}
ENABLE_EXTERNAL_TESTS: ${{ secrets.ENABLE_EXTERNAL_TESTS }}
run: bundle exec rake
linter:
Expand Down
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ group :test do
gem "domain_name"
gem "websocket-client-simple"
gem "prism"
gem "redis-client"
end
16 changes: 16 additions & 0 deletions lib/rage/cable/adapters/base.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# frozen_string_literal: true

class Rage::Cable::Adapters::Base
def pick_a_worker(&block)
_lock, lock_path = Tempfile.new.yield_self { |file| [file, file.path] }

Iodine.on_state(:on_start) do
if File.new(lock_path).flock(File::LOCK_EX | File::LOCK_NB)
if Rage.logger.debug?
puts "INFO: #{Process.pid} is managing #{self.class.name.split("::").last} subscriptions."
end
block.call
end
end
end
end
127 changes: 127 additions & 0 deletions lib/rage/cable/adapters/redis.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# frozen_string_literal: true

require "securerandom"

if !defined?(RedisClient)
fail <<~ERR
Redis adapter depends on the `redis-client` gem. Add the following line to your Gemfile:
gem "redis-client"
ERR
end

class Rage::Cable::Adapters::Redis < Rage::Cable::Adapters::Base
REDIS_STREAM_NAME = "rage:cable:messages"
DEFAULT_REDIS_OPTIONS = { reconnect_attempts: [0.05, 0.1, 0.5] }

def initialize(config)
@redis_stream = if (prefix = config.delete(:channel_prefix))
"#{prefix}:#{REDIS_STREAM_NAME}"
else
REDIS_STREAM_NAME
end

@redis_config = RedisClient.config(**DEFAULT_REDIS_OPTIONS.merge(config))
@server_uuid = SecureRandom.uuid

redis_version = get_redis_version
if redis_version < Gem::Version.create(5)
raise "Redis adapter only supports Redis 5+. Detected Redis version: #{redis_version}."
end

@trimming_strategy = redis_version < Gem::Version.create("6.2.0") ? :maxlen : :minid

pick_a_worker { poll }
end

def publish(stream_name, data)
message_uuid = SecureRandom.uuid

publish_redis.call(
"XADD",
@redis_stream,
trimming_method, "~", trimming_value,
"*",
"1", stream_name,
"2", data.to_json,
"3", @server_uuid,
"4", message_uuid
)
end

private

def publish_redis
@publish_redis ||= @redis_config.new_client
end

def trimming_method
@trimming_strategy == :maxlen ? "MAXLEN" : "MINID"
end

def trimming_value
@trimming_strategy == :maxlen ? "10000" : ((Time.now.to_f - 5 * 60) * 1000).to_i
end

def get_redis_version
service_redis = @redis_config.new_client
version = service_redis.call("INFO").match(/redis_version:([[:graph:]]+)/)[1]

Gem::Version.create(version)

rescue RedisClient::Error => e
puts "FATAL: Couldn't connect to Redis - all broadcasts will be limited to the current server."
puts e.backtrace.join("\n")
Gem::Version.create(5)

ensure
service_redis.close
end

def error_backoff_intervals
@error_backoff_intervals ||= Enumerator.new do |y|
y << 0.2 << 0.5 << 1 << 2 << 5
loop { y << 10 }
end
end

def poll
unless Fiber.scheduler
Fiber.set_scheduler(Rage::FiberScheduler.new)
end

Iodine.on_state(:start_shutdown) do
@stopping = true
end

Fiber.schedule do
read_redis = @redis_config.new_client
last_id = (Time.now.to_f * 1000).to_i
last_message_uuid = nil

loop do
data = read_redis.blocking_call(5, "XREAD", "COUNT", "100", "BLOCK", "5000", "STREAMS", @redis_stream, last_id)

if data
data[@redis_stream].each do |id, (_, stream_name, _, serialized_data, _, broadcaster_uuid, _, message_uuid)|
if broadcaster_uuid != @server_uuid && message_uuid != last_message_uuid
Rage.config.cable.protocol.broadcast(stream_name, JSON.parse(serialized_data))
end

last_id = id
last_message_uuid = message_uuid
end
end

rescue RedisClient::Error => e
Rage.logger.error("Subscriber error: #{e.message} (#{e.class})")
sleep error_backoff_intervals.next
rescue => e
@stopping ? break : raise(e)
else
error_backoff_intervals.rewind
end
end
end
end
28 changes: 23 additions & 5 deletions lib/rage/cable/cable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ module Rage::Cable
# run Rage.cable.application
# end
def self.application
protocol = Rage.config.cable.protocol
protocol.init(__router)
# explicitly initialize the adapter
__adapter

handler = __build_handler(protocol)
accept_response = [0, protocol.protocol_definition, []]
handler = __build_handler(__protocol)
accept_response = [0, __protocol.protocol_definition, []]

application = ->(env) do
if env["rack.upgrade?"] == :websocket
Expand All @@ -31,6 +31,15 @@ def self.__router
@__router ||= Router.new
end

# @private
def self.__protocol
@__protocol ||= Rage.config.cable.protocol.tap { |protocol| protocol.init(__router) }
end

def self.__adapter
@__adapter ||= Rage.config.cable.adapter
end

# @private
def self.__build_handler(protocol)
klass = Class.new do
Expand Down Expand Up @@ -94,10 +103,14 @@ def log_error(e)
#
# @param stream [String] the name of the stream
# @param data [Object] the object to send to the clients. This will later be encoded according to the protocol used.
# @return [true]
# @example
# Rage.cable.broadcast("chat", { message: "A new member has joined!" })
def self.broadcast(stream, data)
Rage.config.cable.protocol.broadcast(stream, data)
__protocol.broadcast(stream, data)
__adapter&.publish(stream, data)

true
end

# @!parse [ruby]
Expand All @@ -120,6 +133,11 @@ def self.broadcast(stream, data)
# end
# end

module Adapters
autoload :Base, "rage/cable/adapters/base"
autoload :Redis, "rage/cable/adapters/redis"
end

module Protocol
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/rage/cable/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ def stream_from(stream)
# broadcast("notifications", { message: "A new member has joined!" })
# end
def broadcast(stream, data)
Rage.config.cable.protocol.broadcast(stream, data)
Rage.cable.broadcast(stream, data)
end
# Transmit data to the current client.
Expand Down
39 changes: 32 additions & 7 deletions lib/rage/cable/protocol/actioncable_v1_json.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# frozen_string_literal: true

require "zlib"

##
# A protocol defines the structure, rules and semantics for exchanging data between the client and the server.
# The class that defines a protocol should respond to the following methods:
Expand All @@ -17,6 +19,9 @@
# * `on_shutdown`
# * `on_close`
#
# It is likely that all logic around `@subscription_identifiers` has nothing to do with the protocol itself and
# should be extracted into another class. We'll refactor this once we start working on a new protocol.
#
class Rage::Cable::Protocol::ActioncableV1Json
module TYPE
WELCOME = "welcome"
Expand Down Expand Up @@ -55,14 +60,30 @@ def self.protocol_definition
def self.init(router)
@router = router

ping_counter = Time.now.to_i
::Iodine.run_every(3000) do
ping_counter += 1
::Iodine.publish("cable:ping", { type: TYPE::PING, message: ping_counter }.to_json)
Iodine.on_state(:on_start) do
ping_counter = Time.now.to_i

Iodine.run_every(3000) do
ping_counter += 1
Iodine.publish("cable:ping", { type: TYPE::PING, message: ping_counter }.to_json, Iodine::PubSub::PROCESS)
end
end

# Hash<String(stream name) => Array<Hash>(subscription params)>
@subscription_identifiers = Hash.new { |hash, key| hash[key] = [] }

# this is a fallback to synchronize subscription identifiers across different worker processes;
# we expect connections to be distributed among all workers, so this code will almost never be called;
# we also synchronize subscriptions with the master process so that the forks that are spun up instead
# of the crashed ones also had access to the identifiers;
Iodine.subscribe("cable:synchronize") do |_, subscription_msg|
stream_name, params = Rage::ParamsParser.json_parse(subscription_msg)
@subscription_identifiers[stream_name] << params unless @subscription_identifiers[stream_name].include?(params)
end

Iodine.on_state(:on_finish) do
Iodine.unsubscribe("cable:synchronize")
end
end

# The method is called any time a new WebSocket connection is established.
Expand Down Expand Up @@ -147,8 +168,12 @@ def self.serialize(params, data)
# @param name [String] the stream name
# @param params [Hash] parameters associated with the client
def self.subscribe(connection, name, params)
connection.subscribe("cable:#{name}:#{params.hash}")
@subscription_identifiers[name] << params unless @subscription_identifiers[name].include?(params)
connection.subscribe("cable:#{name}:#{Zlib.crc32(params.to_s)}")

unless @subscription_identifiers[name].include?(params)
@subscription_identifiers[name] << params
::Iodine.publish("cable:synchronize", [name, params].to_json)
end
end

# Broadcast data to all clients connected to a stream.
Expand All @@ -160,7 +185,7 @@ def self.broadcast(name, data)

while i < identifiers.length
params = identifiers[i]
::Iodine.publish("cable:#{name}:#{params.hash}", serialize(params, data))
::Iodine.publish("cable:#{name}:#{Zlib.crc32(params.to_s)}", serialize(params, data))
i += 1
end
end
Expand Down
27 changes: 27 additions & 0 deletions lib/rage/configuration.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# frozen_string_literal: true

require "yaml"
require "erb"

##
# `Rage.configure` can be used to adjust the behavior of your Rage application:
#
Expand Down Expand Up @@ -277,6 +280,30 @@ def middlewares
end
end
end

def config
@config ||= begin
config_file = Rage.root.join("config/cable.yml")

if config_file.exist?
yaml = ERB.new(config_file.read).result
YAML.safe_load(yaml, aliases: true, symbolize_names: true)[Rage.env.to_sym] || {}
else
{}
end
end
end

def adapter_config
config.except(:adapter)
end

def adapter
case config[:adapter]
when "redis"
Rage::Cable::Adapters::Redis.new(adapter_config)
end
end
end

class PublicFileServer
Expand Down
Loading

0 comments on commit f82f63a

Please sign in to comment.