Skip to content

Commit

Permalink
Created adapter interface
Browse files Browse the repository at this point in the history
  • Loading branch information
ankane committed Jul 28, 2016
1 parent 028db7c commit b226ed6
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 99 deletions.
1 change: 1 addition & 0 deletions lib/blazer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require "blazer/version"
require "blazer/data_source"
require "blazer/result"
require "blazer/adapters/active_record_adapter"
require "blazer/engine"

module Blazer
Expand Down
119 changes: 119 additions & 0 deletions lib/blazer/adapters/active_record_adapter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
module Blazer
module Adapters
class ActiveRecordAdapter
attr_reader :data_source, :connection_model

def initialize(data_source)
@data_source = data_source

@connection_model =
Class.new(Blazer::Connection) do
def self.name
"Blazer::Connection::#{object_id}"
end
establish_connection(data_source.settings["url"]) if data_source.settings["url"]
end
end

def schemas
default_schema = (postgresql? || redshift?) ? "public" : connection_model.connection_config[:database]
settings["schemas"] || [connection_model.connection_config[:schema] || default_schema]
end

def tables
result = data_source.run_statement(connection_model.send(:sanitize_sql_array, ["SELECT table_name FROM information_schema.tables WHERE table_schema IN (?) ORDER BY table_name", schemas]))
result.rows.map(&:first)
end

def run_statement(statement, comment)
columns = []
rows = []
error = nil
start_time = Time.now

begin
in_transaction do
set_timeout(data_source.timeout) if data_source.timeout

result = connection_model.connection.select_all("#{statement} /*#{comment}*/")
columns = result.columns
cast_method = Rails::VERSION::MAJOR < 5 ? :type_cast : :cast_value
result.rows.each do |untyped_row|
rows << (result.column_types.empty? ? untyped_row : columns.each_with_index.map { |c, i| untyped_row[i] ? result.column_types[c].send(cast_method, untyped_row[i]) : nil })
end
end
rescue ActiveRecord::StatementInvalid => e
error = e.message.sub(/.+ERROR: /, "")
error = Blazer::TIMEOUT_MESSAGE if Blazer::TIMEOUT_ERRORS.any? { |e| error.include?(e) }
end

[columns, rows, error]
end

def reconnect
connection_model.establish_connection(settings["url"])
end

def cost(statement)
result = explain(statement)
match = /cost=\d+\.\d+..(\d+\.\d+) /.match(result)
match[1] if match
end

def explain(statement)
if postgresql? || redshift?
connection_model.connection.select_all("EXPLAIN #{statement}").rows.first.first
end
rescue
nil
end

private

def postgresql?
["PostgreSQL", "PostGIS"].include?(adapter_name)
end

def redshift?
["Redshift"].include?(adapter_name)
end

def mysql?
["MySQL", "Mysql2", "Mysql2Spatial"].include?(adapter_name)
end

def adapter_name
connection_model.connection.adapter_name
end

def set_timeout(timeout)
if postgresql? || redshift?
connection_model.connection.execute("SET statement_timeout = #{timeout.to_i * 1000}")
elsif mysql?
connection_model.connection.execute("SET max_execution_time = #{timeout.to_i * 1000}")
else
raise Blazer::TimeoutNotSupported, "Timeout not supported for #{adapter_name} adapter"
end
end

def use_transaction?
settings.key?("use_transaction") ? settings["use_transaction"] : true
end

def in_transaction
if use_transaction?
connection_model.transaction do
yield
raise ActiveRecord::Rollback
end
else
yield
end
end

def settings
@data_source.settings
end
end
end
end
106 changes: 7 additions & 99 deletions lib/blazer/data_source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

module Blazer
class DataSource
attr_reader :id, :settings, :connection_model
extend Forwardable

attr_reader :id, :settings, :adapter

def_delegators :adapter, :schema, :tables, :reconnect, :cost, :explain

def initialize(id, settings)
@id = id
Expand All @@ -12,13 +16,7 @@ def initialize(id, settings)
raise Blazer::Error, "Empty url"
end

@connection_model =
Class.new(Blazer::Connection) do
def self.name
"Blazer::Connection::#{object_id}"
end
establish_connection(settings["url"]) if settings["url"]
end
@adapter = Blazer::Adapters::ActiveRecordAdapter.new(self)
end

def name
Expand Down Expand Up @@ -78,24 +76,6 @@ def local_time_suffix
@local_time_suffix ||= Array(settings["local_time_suffix"])
end

def use_transaction?
settings.key?("use_transaction") ? settings["use_transaction"] : true
end

def cost(statement)
result = explain(statement)
match = /cost=\d+\.\d+..(\d+\.\d+) /.match(result)
match[1] if match
end

def explain(statement)
if postgresql? || redshift?
connection_model.connection.select_all("EXPLAIN #{statement}").rows.first.first
end
rescue
nil
end

def run_main_statement(statement, options = {})
query = options[:query]
Blazer.transform_statement.call(self, statement) if Blazer.transform_statement
Expand Down Expand Up @@ -192,70 +172,13 @@ def run_cache_key(run_id)
cache_key(["run", run_id])
end

def schemas
default_schema = (postgresql? || redshift?) ? "public" : connection_model.connection_config[:database]
settings["schemas"] || [connection_model.connection_config[:schema] || default_schema]
end

def tables
result = run_statement(connection_model.send(:sanitize_sql_array, ["SELECT table_name FROM information_schema.tables WHERE table_schema IN (?) ORDER BY table_name", schemas]))
result.rows.map(&:first)
end

def postgresql?
["PostgreSQL", "PostGIS"].include?(adapter_name)
end

def redshift?
["Redshift"].include?(adapter_name)
end

def mysql?
["MySQL", "Mysql2", "Mysql2Spatial"].include?(adapter_name)
end

def reconnect
connection_model.establish_connection(settings["url"])
end

protected

def run_statement_helper(statement, comment, run_id)
columns = []
rows = []
error = nil
start_time = Time.now
result = nil

begin
in_transaction do
if timeout
if postgresql? || redshift?
connection_model.connection.execute("SET statement_timeout = #{timeout.to_i * 1000}")
elsif mysql?
connection_model.connection.execute("SET max_execution_time = #{timeout.to_i * 1000}")
else
raise Blazer::TimeoutNotSupported, "Timeout not supported for #{adapter_name} adapter"
end
end

result = connection_model.connection.select_all("#{statement} /*#{comment}*/")
end
rescue ActiveRecord::StatementInvalid => e
error = e.message.sub(/.+ERROR: /, "")
error = Blazer::TIMEOUT_MESSAGE if Blazer::TIMEOUT_ERRORS.any? { |e| error.include?(e) }
end

columns, rows, error = @adapter.run_statement(statement, comment)
duration = Time.now - start_time

if result
columns = result.columns
cast_method = Rails::VERSION::MAJOR < 5 ? :type_cast : :cast_value
result.rows.each do |untyped_row|
rows << (result.column_types.empty? ? untyped_row : columns.each_with_index.map { |c, i| untyped_row[i] ? result.column_types[c].send(cast_method, untyped_row[i]) : nil })
end
end

cache_data = nil
cache = !error && (cache_mode == "all" || (cache_mode == "slow" && duration >= cache_slow_threshold))
if cache || run_id
Expand All @@ -276,20 +199,5 @@ def run_statement_helper(statement, comment, run_id)

Blazer::Result.new(self, columns, rows, error, nil, cache && !cache_data.nil?)
end

def adapter_name
connection_model.connection.adapter_name
end

def in_transaction
if use_transaction?
connection_model.transaction do
yield
raise ActiveRecord::Rollback
end
else
yield
end
end
end
end

0 comments on commit b226ed6

Please sign in to comment.