Skip to content

Commit

Permalink
Merge pull request yatish27#15 from matthew342/master
Browse files Browse the repository at this point in the history
Merge results from multiple batches, add option to control whether requests wait for result or return asynchronously, add specs
  • Loading branch information
yatish27 committed Aug 29, 2013
2 parents c40f049 + 86b50b3 commit 8f1804a
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 89 deletions.
44 changes: 15 additions & 29 deletions lib/salesforce_bulk_api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,50 +18,36 @@ def initialize(client)
@connection = SalesforceBulkApi::Connection.new(@@SALESFORCE_API_VERSION,client)
end

def upsert(sobject, records, external_field)
self.do_operation('upsert', sobject, records, external_field)
def upsert(sobject, records, external_field, get_response = false, send_nulls = false, batch_size = 10000, timeout = 1500)
self.do_operation('upsert', sobject, records, external_field, get_response, timeout, batch_size, send_nulls)
end

def update(sobject, records)
self.do_operation('update', sobject, records, nil)
def update(sobject, records, get_response = false, send_nulls = false, batch_size = 10000, timeout = 1500)
self.do_operation('update', sobject, records, nil, get_response, timeout, batch_size, send_nulls)
end

def create(sobject, records)
self.do_operation('insert', sobject, records, nil)
def create(sobject, records, get_response = false, send_nulls = false, batch_size = 10000, timeout = 1500)
self.do_operation('insert', sobject, records, nil, get_response, timeout, batch_size, send_nulls)
end

def delete(sobject, records)
self.do_operation('delete', sobject, records, nil)
def delete(sobject, records, get_response = false, batch_size = 10000, timeout = 1500)
self.do_operation('delete', sobject, records, nil, get_response, timeout, batch_size)
end

def query(sobject, query)
self.do_operation('query', sobject, query, nil)
def query(sobject, query, batch_size = 10000, timeout = 1500)
self.do_operation('query', sobject, query, nil, true, timeout, batch_size)
end

#private

def do_operation(operation, sobject, records, external_field)
def do_operation(operation, sobject, records, external_field, get_response, timeout, batch_size, send_nulls = false)
job = SalesforceBulkApi::Job.new(operation, sobject, records, external_field, @connection)

# TODO: put this in one function
job_id = job.create_job()
batch_id = operation == "query" ? job.add_query() : job.add_batch()
job.close_job()

while true
state = job.check_batch_status()
if state['state'][0] != "Queued" && state['state'][0] != "InProgress"
break
end
sleep(2) # wait x seconds and check again
end

if state['state'][0] == 'Completed'
state.merge!({:records => job.get_batch_result()})
return state
else
return state
end
operation == "query" ? job.add_query() : job.add_batches(batch_size, send_nulls)
response = job.close_job
response.merge!({'batches' => job.get_job_result(get_response, timeout)}) if get_response == true
response
end
end # End class
end
169 changes: 118 additions & 51 deletions lib/salesforce_bulk_api/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,122 +4,189 @@ class Job

def initialize(operation, sobject, records, external_field, connection)

@@operation = operation
@@sobject = sobject
@@external_field = external_field
@@records = records
@@connection = connection
@@XML_HEADER = '<?xml version="1.0" encoding="utf-8" ?>'
@operation = operation
@sobject = sobject
@external_field = external_field
@records = records
@connection = connection
@batch_ids = []
@XML_HEADER = '<?xml version="1.0" encoding="utf-8" ?>'

end

def create_job()
xml = "#{@@XML_HEADER}<jobInfo xmlns=\"http://www.force.com/2009/06/asyncapi/dataload\">"
xml += "<operation>#{@@operation}</operation>"
xml += "<object>#{@@sobject}</object>"
if !@@external_field.nil? # This only happens on upsert
xml += "<externalIdFieldName>#{@@external_field}</externalIdFieldName>"
xml = "#{@XML_HEADER}<jobInfo xmlns=\"http://www.force.com/2009/06/asyncapi/dataload\">"
xml += "<operation>#{@operation}</operation>"
xml += "<object>#{@sobject}</object>"
if !@external_field.nil? # This only happens on upsert
xml += "<externalIdFieldName>#{@external_field}</externalIdFieldName>"
end
xml += "<contentType>XML</contentType>"
xml += "</jobInfo>"

path = "job"
headers = Hash['Content-Type' => 'application/xml; charset=utf-8']

response = @@connection.post_xml(nil, path, xml, headers)
response = @connection.post_xml(nil, path, xml, headers)
response_parsed = XmlSimple.xml_in(response)
@@job_id = response_parsed['id'][0]
@job_id = response_parsed['id'][0]
end

def close_job()
xml = "#{@@XML_HEADER}<jobInfo xmlns=\"http://www.force.com/2009/06/asyncapi/dataload\">"
xml = "#{@XML_HEADER}<jobInfo xmlns=\"http://www.force.com/2009/06/asyncapi/dataload\">"
xml += "<state>Closed</state>"
xml += "</jobInfo>"

path = "job/#{@@job_id}"
path = "job/#{@job_id}"
headers = Hash['Content-Type' => 'application/xml; charset=utf-8']

response = @@connection.post_xml(nil, path, xml, headers)
response = @connection.post_xml(nil, path, xml, headers)
response_parsed = XmlSimple.xml_in(response)

#job_id = response_parsed['id'][0]
end

def add_query
path = "job/#{@@job_id}/batch/"
path = "job/#{@job_id}/batch/"
headers = Hash["Content-Type" => "application/xml; charset=UTF-8"]

response = @@connection.post_xml(nil, path, @@records, headers)
response = @connection.post_xml(nil, path, @records, headers)
response_parsed = XmlSimple.xml_in(response)

@@batch_id = response_parsed['id'][0]
@batch_ids << response_parsed['id'][0]
end

def add_batch()
keys = @@records.reduce({}) {|h,pairs| pairs.each {|k,v| (h[k] ||= []) << v}; h}.keys
def add_batches(batch_size, send_nulls = false)
raise 'Records must be an array of hashes.' unless @records.is_a? Array
keys = @records.reduce({}) {|h,pairs| pairs.each {|k,v| (h[k] ||= []) << v}; h}.keys
headers = keys
@@records_dup=@@records.clone
super_records=[]
(@@records_dup.size/10000).to_i.times do
super_records<<@@records_dup.pop(10000)
@records_dup = @records.clone
super_records = []
(@records_dup.size/batch_size).to_i.times do
super_records << @records_dup.pop(batch_size)
end
super_records<<@@records_dup
super_records << @records_dup unless @records_dup.empty?

super_records.each do|batch|
xml = "#{@@XML_HEADER}<sObjects xmlns=\"http://www.force.com/2009/06/asyncapi/dataload\">"
super_records.each do |batch|
xml = "#{@XML_HEADER}<sObjects xmlns=\"http://www.force.com/2009/06/asyncapi/dataload\">"
batch.each do |r|
xml += "<sObject>"
fields_to_null = []
object_keys = ''
keys.each do |k|
xml += "<#{k}>#{r[k]}</#{k}>" unless r[k].blank?
unless r[k].to_s.empty? && !send_nulls
if r[k].respond_to?(:encode)
object_keys += "<#{k}>#{r[k].encode(:xml => :text)}</#{k}>"
else
object_keys += "<#{k}>#{r[k]}</#{k}>"
end
end
if r[k].to_s.empty? && send_nulls
fields_to_null << k
end
end
xml += "<sObject "
if send_nulls
xml += "fieldsToNull=\"["
fields_to_null = ['Website', 'Other_Phone__c']
xml += fields_to_null.inject('') {|memo, field| memo << "'#{field}',"}
xml.slice!(xml.length - 1)
xml += "]\""
end
xml += ">"
xml += object_keys
xml += "</sObject>"
end
xml += "</sObjects>"


path = "job/#{@@job_id}/batch/"

path = "job/#{@job_id}/batch/"
headers = Hash["Content-Type" => "application/xml; charset=UTF-8"]
response = @@connection.post_xml(nil, path, xml, headers)
response = @connection.post_xml(nil, path, xml, headers)
response_parsed = XmlSimple.xml_in(response)


@@batch_id = response_parsed['id'][0]
@batch_ids << response_parsed['id'][0] if response_parsed['id']
end
end

def check_batch_status()
path = "job/#{@@job_id}/batch/#{@@batch_id}"
def check_job_status
path = "job/#{@job_id}"
headers = Hash.new

response = @@connection.get_request(nil, path, headers)
response_parsed = XmlSimple.xml_in(response)
response = @connection.get_request(nil, path, headers)

begin
response_parsed = XmlSimple.xml_in(response) if response
response_parsed
rescue Exception => e
rescue StandardError => e
nil
end
end

def check_batch_status(batch_id)
path = "job/#{@job_id}/batch/#{batch_id}"
headers = Hash.new

response = @connection.get_request(nil, path, headers)

def get_batch_result()
path = "job/#{@@job_id}/batch/#{@@batch_id}/result"
begin
response_parsed = XmlSimple.xml_in(response) if response
response_parsed
rescue StandardError => e
nil
end
end

def get_job_result(return_result, timeout)
# timeout is in seconds
state = []
Timeout::timeout(timeout, SalesforceBulkApi::JobTimeout) do
while true
if self.check_job_status['state'][0] == 'Closed'
@batch_ids.each do |batch_id|
batch_state = self.check_batch_status(batch_id)
if batch_state['state'][0] != "Queued" && batch_state['state'][0] != "InProgress"
state << (batch_state)
@batch_ids.delete(batch_id)
end
sleep(2) # wait x seconds and check again
end
break if @batch_ids.empty?
else
break
end
end
end

state.each_with_index do |batch_state, i|
if batch_state['state'][0] == 'Completed' && return_result == true
state[i].merge!({'response' => self.get_batch_result(batch_state['id'][0])})
end
end
state
end

def get_batch_result(batch_id)
path = "job/#{@job_id}/batch/#{batch_id}/result"
headers = Hash["Content-Type" => "application/xml; charset=UTF-8"]

response = @@connection.get_request(nil, path, headers)
response = @connection.get_request(nil, path, headers)
response_parsed = XmlSimple.xml_in(response)
results = response_parsed['result'] unless @operation == 'query'

if(@@operation == "query") # The query op requires us to do another request to get the results
response_parsed = XmlSimple.xml_in(response)
if(@operation == 'query') # The query op requires us to do another request to get the results
result_id = response_parsed["result"][0]

path = "job/#{@@job_id}/batch/#{@@batch_id}/result/#{result_id}"
path = "job/#{@job_id}/batch/#{batch_id}/result/#{result_id}"
headers = Hash.new
headers = Hash["Content-Type" => "application/xml; charset=UTF-8"]
response = @@connection.get_request(nil, path, headers)
response = @connection.get_request(nil, path, headers)
response_parsed = XmlSimple.xml_in(response)
results = response_parsed['records']
end


results
end


end

class JobTimeout < StandardError
end
end
3 changes: 2 additions & 1 deletion salesforce_bulk_api.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ Gem::Specification.new do |s|
s.add_dependency(%q<json>, [">= 0"])
s.add_dependency(%q<xml-simple>, [">= 0"])
s.add_development_dependency "rspec"
s.add_development_dependency "vcr"
s.add_development_dependency("webmock", ["~> 1.13"])
s.add_development_dependency("vcr", ['~> 2.5'])
s.files = `git ls-files`.split("\n")
s.test_files = `git ls-files -- {test,spec,features}/*`.split("\n")
s.executables = `git ls-files -- bin/*`.split("\n").map{ |f| File.basename(f) }
Expand Down
Loading

0 comments on commit 8f1804a

Please sign in to comment.