1
- require_relative 'szmq/szmq'
2
- require 'json'
3
-
4
- module Kymera
5
-
6
- class Broker
7
-
8
- def initialize
9
- config = Kymera ::Config . new
10
- @zmq = Kymera ::SZMQ . new
11
- Thread . abort_on_exception = true
12
- #This will change once I get the worker registry up and running. When that is up, there will be one connection spawned for
13
- #each of the workers that are connected. Edit - Thats actually not sustainable. The number of workers should be able to change
14
- #dynamically without impact to the broker. An inventory check should be made at the start of every new test run. There should also
15
- # be some logic to handle the disconnection of workers in the middle of a test run.
16
- @num_of_connections = config . broker [ "number_of_connections" ]
17
- #This socket is for getting tests from the client
18
- @client_address = "tcp://*:#{ config . broker [ "client_listening_port" ] } "
19
- @internal_address = "tcp://*:#{ config . broker [ "internal_worker_port" ] } "
20
- @worker_address = "tcp://*:#{ config . broker [ "worker_listening_port" ] } "
21
- @test_socket = @zmq . socket ( @client_address , 'pull' )
22
- @test_socket . bind
23
- @front_end = @zmq . socket ( @internal_address , 'router' )
24
- @back_end = @zmq . socket ( @worker_address , 'dealer' )
25
- @proxy = Thread . new { @zmq . start_proxy ( @front_end , @back_end ) }
26
- @results = ''
27
- end
28
-
29
- #This brings up the broker so that it can receive test run requests.
30
- def start_broker
31
- puts "Broker started..."
32
- @test_socket . receive do |tests |
33
- puts "Received test run request.."
34
- start_test_run ( tests )
35
- end
36
- end
37
-
38
- private
39
-
40
- #This is the start of the test run and is called when the broker receives a test run request
41
- def start_test_run ( test_run )
42
- puts 'Starting test run...'
43
- test_run = JSON . parse ( test_run )
44
- puts 'Copying over tests..'
45
- tests = test_run [ "tests" ] . copy
46
- threads = [ ]
47
- puts "Checking for group configuration...#{ test_run [ 'grouped' ] } "
48
- # puts "This is the test run message:\n"
49
- # puts "#{test_run}"
50
- if test_run [ "grouped" ] || test_run [ "grouped" ] . to_s . downcase == 'true'
51
- puts 'Grouping tests....'
52
- tests = group_tests ( tests )
53
- end unless test_run [ "grouped" ] . nil?
54
-
55
- @test_count = test_run [ "tests" ] . length
56
- report_test_config ( test_run )
57
-
58
- if tests . length > @num_of_connections . to_i
59
- puts "Test count was higher than connections. Starting tests and initiating run queue"
60
- 1 . upto @num_of_connections . to_i do
61
- test = tests . pop
62
- break if test . nil?
63
- threads << run_test ( test , test_run )
64
- end
65
- work_queue ( threads , tests , test_run )
66
-
67
- else
68
- puts "Test count was not higher than the connections. Starting all tests."
69
- 1 . upto tests . length do
70
- test = tests . pop
71
- break if test . nil?
72
- threads << run_test ( test , test_run )
73
- end
74
-
75
- end
76
-
77
- until threads_dead? ( threads ) do
78
- # text = "Thread count: #{threads.count} | number of alive: #{num_of_alive(threads)} | number of dead: #{num_of_dead(threads)}"
79
- # $stdout << "\r" + (" " * text.length)
80
- # $stdout << "\r#{text}"
81
- # puts "Threads still working...#{num_of_alive(threads)}"
82
- end
83
-
84
- puts "Test run completed....\n Gathering results from threads..."
85
- results = get_results ( threads )
86
- Kymera ::TestResultsCollector . new . finalize_results ( test_run [ "test_count" ] , test_run [ "run_id" ] , results , test_run [ "runner" ] , test_run [ "start_time" ] )
87
-
88
- end
89
-
90
- def get_results ( threads )
91
- results = ''
92
- threads . each do |t |
93
- results << t . value
94
- end
95
- results
96
- end
97
-
98
- def group_tests ( tests )
99
- #This creates a group for each of the available connection as specified by the @num_of_connections variable
100
- #It will then iterate over all of the tests and add a test to each of the groups
101
- #at the end of the iteration, if any groups are left empty, they are deleted and the trimmed array is returned
102
- groups = [ ]
103
-
104
- puts "Setting group containers..."
105
- 1 . upto @num_of_connections . to_i do
106
- groups << [ ]
107
- end
108
-
109
- puts "Distributing tests..."
110
- while tests . length > 0
111
- groups . each do |group |
112
- test = tests . pop
113
- group << test unless test . nil?
114
- break if test . nil?
115
- end
116
- end
117
- groups . delete_if { |group | group . empty? }
118
- end
119
-
120
-
121
- #If there are tests left over after the initial test start up, they are placed into a queue. The queue is then worked until all tests in the queue have been executed
122
- def work_queue ( threads , tests , options )
123
- until tests . empty?
124
- threads . delete_if { |t | !t . alive? }
125
- if threads . length < @num_of_connections
126
- test = tests . pop
127
- break if test . nil?
128
- threads << run_test ( test , options )
129
- end
130
- end
131
- end
132
-
133
- def threads_dead? ( threads )
134
- result = true
135
- threads . each do |t |
136
- if t . alive?
137
- result = false
138
- break
139
- end
140
- end
141
- result
142
- end
143
-
144
- def num_of_dead ( threads )
145
- count = 0
146
- threads . each do |t |
147
- count +=1 if !t . alive?
148
- end
149
- count
150
- end
151
-
152
- def num_of_alive ( threads )
153
- count = 0
154
- threads . each do |t |
155
- count +=1 if t . alive?
156
- end
157
- count
158
- end
159
-
160
- #This runs each test individually
161
- def run_test ( test , options )
162
- port = @internal_address . split ( ':' ) [ 2 ]
163
- Thread . new {
164
- zmq = Kymera ::SZMQ . new
165
- message = JSON . generate ( { :test => test , :runner => options [ "runner" ] , :options => options [ "options" ] , :run_id => options [ "run_id" ] ,
166
- :test_count => @test_count , :branch => options [ "branch" ] , :start_time => options [ "start_time" ] } )
167
- socket = zmq . socket ( "tcp://127.0.0.1:#{ port } " , 'request' )
168
- socket . connect
169
- puts "Sending: #{ message } "
170
- results = socket . send_message ( message )
171
- socket . close
172
- results
173
- }
174
- end
175
-
176
- #This gives a print out of the test run that was received
177
- def report_test_config ( test_run )
178
- puts "Running test with the following configuration:"
179
- puts "Branch: #{ test_run [ "branch" ] } "
180
- puts "Runner: #{ test_run [ "runner" ] } "
181
- puts "Run ID: #{ test_run [ "run_id" ] } "
182
- puts "Runner Options: #{ test_run [ "options" ] } "
183
- puts "Total number tests: #{ test_run [ "tests" ] . length } "
184
- end
185
-
186
-
187
- end
188
-
189
- end
1
+ # require_relative 'szmq/szmq'
2
+ # require 'json'
3
+ #
4
+ # module Kymera
5
+ #
6
+ # class Broker
7
+ #
8
+ # def initialize
9
+ # config = Kymera::Config.new
10
+ # @zmq = Kymera::SZMQ.new
11
+ # Thread.abort_on_exception = true
12
+ # #This will change once I get the worker registry up and running. When that is up, there will be one connection spawned for
13
+ # #each of the workers that are connected. Edit - Thats actually not sustainable. The number of workers should be able to change
14
+ # #dynamically without impact to the broker. An inventory check should be made at the start of every new test run. There should also
15
+ # # be some logic to handle the disconnection of workers in the middle of a test run.
16
+ # @num_of_connections = config.broker["number_of_connections"]
17
+ # #This socket is for getting tests from the client
18
+ # @client_address = "tcp://*:#{config.broker["client_listening_port"]}"
19
+ # @internal_address = "tcp://*:#{config.broker["internal_worker_port"]}"
20
+ # @worker_address = "tcp://*:#{config.broker["worker_listening_port"]}"
21
+ # @test_socket = @zmq.socket(@client_address, 'pull')
22
+ # @test_socket.bind
23
+ # @front_end = @zmq.socket(@internal_address, 'router')
24
+ # @back_end = @zmq.socket(@worker_address, 'dealer')
25
+ # @proxy = Thread.new {@zmq.start_proxy(@front_end, @back_end)}
26
+ # @results = ''
27
+ # end
28
+ #
29
+ # #This brings up the broker so that it can receive test run requests.
30
+ # def start_broker
31
+ # puts "Broker started..."
32
+ # @test_socket.receive do |tests|
33
+ # puts "Received test run request.."
34
+ # start_test_run(tests)
35
+ # end
36
+ # end
37
+ #
38
+ # private
39
+ #
40
+ # #This is the start of the test run and is called when the broker receives a test run request
41
+ # def start_test_run(test_run)
42
+ # puts 'Starting test run...'
43
+ # test_run = JSON.parse(test_run)
44
+ # puts 'Copying over tests..'
45
+ # tests = test_run["tests"].copy
46
+ # threads = []
47
+ # puts "Checking for group configuration...#{test_run['grouped']}"
48
+ # # puts "This is the test run message:\n"
49
+ # # puts "#{test_run}"
50
+ # if test_run["grouped"] || test_run["grouped"].to_s.downcase == 'true'
51
+ # puts 'Grouping tests....'
52
+ # tests = group_tests(tests)
53
+ # end unless test_run["grouped"].nil?
54
+ #
55
+ # @test_count = test_run["tests"].length
56
+ # report_test_config(test_run)
57
+ #
58
+ # if tests.length > @num_of_connections.to_i
59
+ # puts "Test count was higher than connections. Starting tests and initiating run queue"
60
+ # 1.upto @num_of_connections.to_i do
61
+ # test = tests.pop
62
+ # break if test.nil?
63
+ # threads << run_test(test, test_run)
64
+ # end
65
+ # work_queue(threads, tests, test_run)
66
+ #
67
+ # else
68
+ # puts "Test count was not higher than the connections. Starting all tests."
69
+ # 1.upto tests.length do
70
+ # test = tests.pop
71
+ # break if test.nil?
72
+ # threads << run_test(test, test_run)
73
+ # end
74
+ #
75
+ # end
76
+ #
77
+ # until threads_dead?(threads) do
78
+ # # text = "Thread count: #{threads.count} | number of alive: #{num_of_alive(threads)} | number of dead: #{num_of_dead(threads)}"
79
+ # # $stdout << "\r" + (" " * text.length)
80
+ # # $stdout << "\r#{text}"
81
+ # # puts "Threads still working...#{num_of_alive(threads)}"
82
+ # end
83
+ #
84
+ # puts "Test run completed....\nGathering results from threads..."
85
+ # results = get_results(threads)
86
+ # Kymera::TestResultsCollector.new.finalize_results(test_run["test_count"], test_run["run_id"], results, test_run["runner"], test_run["start_time"])
87
+ #
88
+ # end
89
+ #
90
+ # def get_results(threads)
91
+ # results = ''
92
+ # threads.each do |t|
93
+ # results << t.value
94
+ # end
95
+ # results
96
+ # end
97
+ #
98
+ # def group_tests(tests)
99
+ # #This creates a group for each of the available connection as specified by the @num_of_connections variable
100
+ # #It will then iterate over all of the tests and add a test to each of the groups
101
+ # #at the end of the iteration, if any groups are left empty, they are deleted and the trimmed array is returned
102
+ # groups = []
103
+ #
104
+ # puts "Setting group containers..."
105
+ # 1.upto @num_of_connections.to_i do
106
+ # groups << []
107
+ # end
108
+ #
109
+ # puts "Distributing tests..."
110
+ # while tests.length > 0
111
+ # groups.each do |group|
112
+ # test = tests.pop
113
+ # group << test unless test.nil?
114
+ # break if test.nil?
115
+ # end
116
+ # end
117
+ # groups.delete_if {|group| group.empty?}
118
+ # end
119
+ #
120
+ #
121
+ # #If there are tests left over after the initial test start up, they are placed into a queue. The queue is then worked until all tests in the queue have been executed
122
+ # def work_queue(threads, tests, options)
123
+ # until tests.empty?
124
+ # threads.delete_if {|t| !t.alive?}
125
+ # if threads.length < @num_of_connections
126
+ # test = tests.pop
127
+ # break if test.nil?
128
+ # threads << run_test(test, options)
129
+ # end
130
+ # end
131
+ # end
132
+ #
133
+ # def threads_dead?(threads)
134
+ # result = true
135
+ # threads.each do |t|
136
+ # if t.alive?
137
+ # result = false
138
+ # break
139
+ # end
140
+ # end
141
+ # result
142
+ # end
143
+ #
144
+ # def num_of_dead(threads)
145
+ # count = 0
146
+ # threads.each do |t|
147
+ # count +=1 if !t.alive?
148
+ # end
149
+ # count
150
+ # end
151
+ #
152
+ # def num_of_alive(threads)
153
+ # count = 0
154
+ # threads.each do |t|
155
+ # count +=1 if t.alive?
156
+ # end
157
+ # count
158
+ # end
159
+ #
160
+ # #This runs each test individually
161
+ # def run_test(test, options)
162
+ # port = @internal_address.split(':')[2]
163
+ # Thread.new {
164
+ # zmq = Kymera::SZMQ.new
165
+ # message = JSON.generate({:test => test, :runner => options["runner"], :options => options["options"], :run_id => options["run_id"],
166
+ # :test_count => @test_count, :branch => options["branch"], :start_time => options["start_time"]})
167
+ # socket = zmq.socket("tcp://127.0.0.1:#{port}", 'request')
168
+ # socket.connect
169
+ # puts "Sending: #{message}"
170
+ # results = socket.send_message(message)
171
+ # socket.close
172
+ # results
173
+ # }
174
+ # end
175
+ #
176
+ # #This gives a print out of the test run that was received
177
+ # def report_test_config(test_run)
178
+ # puts "Running test with the following configuration:"
179
+ # puts "Branch: #{test_run["branch"]}"
180
+ # puts "Runner: #{test_run["runner"]}"
181
+ # puts "Run ID: #{test_run["run_id"]}"
182
+ # puts "Runner Options: #{test_run["options"]}"
183
+ # puts "Total number tests: #{test_run["tests"].length}"
184
+ # end
185
+ #
186
+ #
187
+ # end
188
+ #
189
+ # end
0 commit comments