-
-
Notifications
You must be signed in to change notification settings - Fork 87
Implementation Guidelines
A few recommendations when working with Kiba:
My understanding is that require
is generally not thread-safe, so calling require
inside Kiba.parse
is not recommended in multi-threaded environments.
Do not do this:
job = Kiba.parse do
require 'dsl_extensions/progress_bar'
# SNIP
end
You are advised to eager-load all your dependencies instead (e.g. from a Sidekiq initializer, or calling require
at the top of your files).
It is very common, and definitely allowed, to reference parameters (such as filenames) or live instances (such as Sequel connections) from Kiba.parse
, in order to condition how your job will run.
In the job below, the name of a source file, a live Sequel
connection, and a Logger
instance, are passed as parameters then used in the definition:
require 'kiba-pro/destinations/sql_upsert'
module ETL
module SyncPartners
module_function
def setup(source_file, sequel_connection, logger)
Kiba.parse do
pre_process do
logger.info "Starting processing for file #{source_file}"
end
source CSVSource,
filename: source_file,
csv_options: { headers: true, col_sep: ',' }
# SNIP
destination Kiba::Pro::Destination::SQLUpsert,
table: :partners,
unique_key: :crm_partner_id,
database: sequel_connection
end
end
end
end
You can then call your job programmatically:
job = ETL::SyncPartners.setup(my_source_file, my_sequel_connection, logger)
Kiba.run(job)
It can be useful at times to use instance variables. This can be done safely, as long as you do not reuse job instances (to avoid keeping state around).
For instance, one could build a bit of statistics like this:
job = Kiba.parse do
pre_process do
@row_read_from_source_count = 0
end
source SomeSource
transform do |row|
@row_read_from_source_count += 1
row
end
# SNIP
post_process do
puts "#{@row_read_from_source_count} rows have been read from source"
end
end
If you are careful with choosing well namespaced variables, this can be used together with Kiba DSL extensions.
It is not recommended to re-use the output of Kiba.parse
(variable job
above) for multiple calls to Kiba.run
.
If you do so, you may unknowingly end up sharing some form of state between runs (such as variables parameters as described above, or in the way you write ETL components), leading to unexpected results.
At time of writing (Kiba v3), if an error is raised while Kiba.run
is called, nothing is done by Kiba to close resources that you may have opened during the processing (such as files, database connections, etc).
It is for now your responsibility to rescue
any error that may happen and to close resources that your components may have opened, or to use construct that will automatically close resources on error (such as the block form of CSV.open
).
When applicable, you can also wrap the call to Kiba.run
by a block-construct to automatically close the resources you need, e.g:
allocate_connection_from_pool do |connection|
job = Kiba.parse do
source SQL, connection: connection
# SNIP
end
Kiba.run(job)
end
(here the connection will be returned to the pool automatically).
Home | Core Concepts | Defining jobs | Running jobs | Writing sources | Writing transforms | Writing destinations | Implementation Guidelines | Kiba Pro
This wiki is tracked by git and publicly editable. You are welcome to fix errors and typos. Any defacing or vandalism of content will result in your changes being reverted and you being blocked.