diff --git a/app/jobs/event_job.rb b/app/jobs/event_job.rb index c1cd1dc..8d8c493 100644 --- a/app/jobs/event_job.rb +++ b/app/jobs/event_job.rb @@ -6,6 +6,15 @@ class EventJob COLUMN_NAMES = Hyper::Event.column_names def perform(*args) + Thread.current[:bq] ||= BufferQueue.new(max_batch_size: (ENV["MAX_BATCH_SIZE"] || 100).to_i, execution_interval: (ENV["EXECUTION_INTERVAL"] || 20).to_i) do |batch| + puts "bulk insert #{batch.size} records" + Hyper::Event.import( + EventJob::COLUMN_NAMES, + batch.flatten.map { |attr| Hyper::Event.new(attr) }, + validate: false, + timestamps: false + ) unless batch.empty? + end params, form, request = args if form @@ -68,14 +77,14 @@ def perform(*args) }) if event["_fv"] == "1" - BQ.push result.merge(event_name: "first_visit") + Thread.current[:bq].push result.merge(event_name: "first_visit") end if event["_ss"] == "1" - BQ.push result.merge(event_name: "session_start") + Thread.current[:bq].push result.merge(event_name: "session_start") end - BQ.push result + Thread.current[:bq].push result end end end diff --git a/config/initializers/sidekiq.rb b/config/initializers/sidekiq.rb index f392b81..c87e8fa 100644 --- a/config/initializers/sidekiq.rb +++ b/config/initializers/sidekiq.rb @@ -2,20 +2,6 @@ require "sidekiq/worker_killer" -if Sidekiq.server? - Sidekiq.on(:startup) do - BQ = BufferQueue.new(max_batch_size: (ENV["MAX_BATCH_SIZE"] || 100).to_i, execution_interval: (ENV["EXECUTION_INTERVAL"] || 20).to_i) do |batch| - puts "bulk insert #{batch.size} records" - Hyper::Event.import( - EventJob::COLUMN_NAMES, - batch.flatten.map { |attr| Hyper::Event.new(attr) }, - validate: false, - timestamps: false - ) unless batch.empty? - end - end -end - Sidekiq.configure_server do |config| config.redis = { url: ENV["REDIS_URL"] || "redis://localhost:6379/0" } config.server_middleware do |chain|