Skip to content

Commit

Permalink
use thread level buffer queue
Browse files Browse the repository at this point in the history
  • Loading branch information
hooopo committed Mar 23, 2021
1 parent 7e6126f commit d4d8eb9
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 17 deletions.
15 changes: 12 additions & 3 deletions app/jobs/event_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ class EventJob
COLUMN_NAMES = Hyper::Event.column_names

def perform(*args)
Thread.current[:bq] ||= BufferQueue.new(max_batch_size: (ENV["MAX_BATCH_SIZE"] || 100).to_i, execution_interval: (ENV["EXECUTION_INTERVAL"] || 20).to_i) do |batch|
puts "bulk insert #{batch.size} records"
Hyper::Event.import(
EventJob::COLUMN_NAMES,
batch.flatten.map { |attr| Hyper::Event.new(attr) },
validate: false,
timestamps: false
) unless batch.empty?
end
params, form, request = args

if form
Expand Down Expand Up @@ -68,14 +77,14 @@ def perform(*args)
})

if event["_fv"] == "1"
BQ.push result.merge(event_name: "first_visit")
Thread.current[:bq].push result.merge(event_name: "first_visit")
end

if event["_ss"] == "1"
BQ.push result.merge(event_name: "session_start")
Thread.current[:bq].push result.merge(event_name: "session_start")
end

BQ.push result
Thread.current[:bq].push result
end
end
end
14 changes: 0 additions & 14 deletions config/initializers/sidekiq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,6 @@

require "sidekiq/worker_killer"

if Sidekiq.server?
Sidekiq.on(:startup) do
BQ = BufferQueue.new(max_batch_size: (ENV["MAX_BATCH_SIZE"] || 100).to_i, execution_interval: (ENV["EXECUTION_INTERVAL"] || 20).to_i) do |batch|
puts "bulk insert #{batch.size} records"
Hyper::Event.import(
EventJob::COLUMN_NAMES,
batch.flatten.map { |attr| Hyper::Event.new(attr) },
validate: false,
timestamps: false
) unless batch.empty?
end
end
end

Sidekiq.configure_server do |config|
config.redis = { url: ENV["REDIS_URL"] || "redis://localhost:6379/0" }
config.server_middleware do |chain|
Expand Down

0 comments on commit d4d8eb9

Please sign in to comment.