diff --git a/examples/pony/talphabet/.gitignore b/examples/pony/talphabet/.gitignore new file mode 100644 index 0000000000..2d4776d02a --- /dev/null +++ b/examples/pony/talphabet/.gitignore @@ -0,0 +1,7 @@ +alphabet +sender +sent.txt +alphabet.out +alphabet.d +_expected.json +_test.txt diff --git a/examples/pony/talphabet/Makefile b/examples/pony/talphabet/Makefile new file mode 100644 index 0000000000..693fd3ccb4 --- /dev/null +++ b/examples/pony/talphabet/Makefile @@ -0,0 +1,58 @@ +# include root makefile +ifndef ROOT_MAKEFILE_MK +include ../../../Makefile +endif + +# prevent rules from being evaluated/included multiple times +ifndef $(abspath $(lastword $(MAKEFILE_LIST)))_MK +$(abspath $(lastword $(MAKEFILE_LIST)))_MK := 1 + + +# The following are control variables that determine what logic from `rules.mk` is enabled + +# `true`/`false` to enable/disable the actual unit test command so it can be overridden (the targets are still created) +# applies to both the pony and elixir test targets +$(abspath $(lastword $(MAKEFILE_LIST)))_UNIT_TEST_COMMAND := false + +# `true`/`false` to enable/disable generate pony related targets (build/test/clean) for pony sources in this directory +# otherwise targets only get created if there are pony sources (*.pony) in this directory. +$(abspath $(lastword $(MAKEFILE_LIST)))_PONY_TARGET := true + +# `true`/`false` to enable/disable generate final file build target using ponyc command for the pony build target so +# it can be overridden manually +$(abspath $(lastword $(MAKEFILE_LIST)))_PONYC_TARGET := true + +# `true`/`false` to enable/disable generate exs related targets (build/test/clean) for elixir sources in this directory +# otherwise targets only get created if there are elixir sources (*.exs) in this directory. +$(abspath $(lastword $(MAKEFILE_LIST)))_EXS_TARGET := true + +# `true`/`false` to enable/disable generate docker related targets (build/push) for a Dockerfile in this directory +# otherwise targets only get created if there is a Dockerfile in this directory +$(abspath $(lastword $(MAKEFILE_LIST)))_DOCKER_TARGET := true + +# `true`/`false` to enable/disable recursing into Makefiles of subdirectories if they exist +# (and by recursion every makefile in the tree that is referenced) +$(abspath $(lastword $(MAKEFILE_LIST)))_RECURSE_SUBMAKEFILES := true + + +ALPHABET_PONY_PATH := $(dir $(abspath $(lastword $(MAKEFILE_LIST)))) + +# standard rules generation makefile +include $(rules_mk_path) + +integration-tests-examples-pony-alphabet: alphabet_pony_test + +alphabet_pony_test: + cd $(ALPHABET_PONY_PATH) && \ + python _test/gen.py && \ + integration_test --framed-file-sender _test.txt \ + --validation-cmd 'python _test/validate.py --expected _expected.json --output' \ + --log-level error \ + --batch-size 10 \ + --output 'received.txt' \ + --command './alphabet' \ + --workers 4 \ + --sink-expect 1000 + +# end of prevent rules from being evaluated/included multiple times +endif diff --git a/examples/pony/talphabet/README.md b/examples/pony/talphabet/README.md new file mode 100644 index 0000000000..c05f2e0a24 --- /dev/null +++ b/examples/pony/talphabet/README.md @@ -0,0 +1,111 @@ +# Alphabet + +This is an example application that will count the number of "votes" sent for +each letter of the alphabet and send out the previous count for that letter at the end of each update. + +## Prerequisites + +- ponyc +- pony-stable +- Wallaroo + +See [Wallaroo Environment Setup Instructions](https://github.com/WallarooLabs/wallaroo/book/getting-started/setup.md). + +## Building + +Build Alphabet with + +```bash +make +``` + +## Generating Data + +A data generator is bundled with the application. Use it to generate a file with a fixed number of psuedo-random votes: + +``` +cd data_gen +./data_gen --message-count 1000 +``` + +This will create a `votes.msg` file in your current working directory. + +## Running Alphabet + +You will need five separate shells to run this application. Open each shell and go to the `examples/pony/alphabet` directory. + +### Shell 1: Metrics + +Start up the Metrics UI if you don't already have it running: + +```bash +docker start mui +``` + +You can verify it started up correctly by visiting [http://localhost:4000](http://localhost:4000). + +If you need to restart the UI, run: + +```bash +docker restart mui +``` + +When it's time to stop the UI, run: + +```bash +docker stop mui +``` + +If you need to start the UI after stopping it, run: + +```bash +docker start mui +``` + +### Shell 2: Data Receiver + +Start a listener + +```bash +../../../../giles/receiver/receiver --listen 127.0.0.1:7002 --no-write \ + --ponythreads=1 --ponynoblock +``` + +### Shell 3: Alphabet +Start the application + +```bash +./alphabet --in 127.0.0.1:7010 --out 127.0.0.1:7002 --metrics 127.0.0.1:5001 \ + --control 127.0.0.1:12500 --data 127.0.0.1:12501 --external 127.0.0.1:5050 \ + --cluster-initializer --ponynoblock --ponythreads=1 +``` + +### Shell 4: Sender + +Start a sender + +```bash +../../../../giles/sender/sender --host 127.0.0.1:7010 \ + --file data_gen/votes.msg \ --batch-size 5 --interval 100_000_000 \ + --messages 150000000 --binary --variable-size --repeat --ponythreads=1 \ + --ponynoblock --no-write +``` + +## Shutdown + +### Shell 5: Shutdown + +You can shut down the cluster with this command at any time: + +```bash +cd ~/wallaroo-tutorial/wallaroo/utils/cluster_shutdown +./cluster_shutdown 127.0.0.1:5050 +``` + +You can shut down Giles Sender and Giles Receiver by pressing Ctrl-c from their respective shells. + +You can shut down the Metrics UI with the following command: + +```bash +docker stop mui +``` diff --git a/examples/pony/talphabet/_test/gen.py b/examples/pony/talphabet/_test/gen.py new file mode 100644 index 0000000000..5474e365a6 --- /dev/null +++ b/examples/pony/talphabet/_test/gen.py @@ -0,0 +1,35 @@ +# Copyright 2017 The Wallaroo Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing +# permissions and limitations under the License. + + +from json import dumps +from random import choice, randrange +from string import lowercase +from struct import pack + +# Construct input data list, and total votes as a dict +expected = {} +data = [] +for x in xrange(1000): + c = choice(lowercase) + v = randrange(1,10000) + data.append(pack('>IsI',5, c, v)) + expected[c] = expected.get(c, 0) + v + +with open('_test.txt', 'wb') as fin: + for v in data: + fin.write(v) + +with open('_expected.json', 'wb') as fout: + fout.write(dumps(expected)) diff --git a/examples/pony/talphabet/_test/validate.py b/examples/pony/talphabet/_test/validate.py new file mode 100644 index 0000000000..fb727c11de --- /dev/null +++ b/examples/pony/talphabet/_test/validate.py @@ -0,0 +1,52 @@ +# Copyright 2017 The Wallaroo Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing +# permissions and limitations under the License. + + +import argparse +from json import loads +from struct import calcsize, unpack + +fmt = '>LsQ' +def decoder(bs): + return unpack(fmt, bs)[1:3] + +def pre_processor(decoded): + totals = {} + for c, v in decoded: + totals[c] = v + return totals + +parser = argparse.ArgumentParser('Alphabet validator') +parser.add_argument('--output', type=argparse.FileType('rb'), + help="The output file of the application.") +parser.add_argument('--expected', type=argparse.FileType('r'), + help=("A file containing the expected final vote tally as " + "JSON serialised dict.")) +args = parser.parse_args() + +chunk_size = calcsize(fmt) +decoded = [] +while True: + chunk = args.output.read(chunk_size) + if not chunk: + break + decoded.append(decoder(chunk)) +processed = pre_processor(decoded) + +expected = loads(args.expected.read()) +for k in expected.keys(): + s_key = str(k) + expected[s_key] = expected.pop(k) + +assert(expected == processed) diff --git a/examples/pony/talphabet/alphabet.pony b/examples/pony/talphabet/alphabet.pony new file mode 100644 index 0000000000..8fa5b1a5d1 --- /dev/null +++ b/examples/pony/talphabet/alphabet.pony @@ -0,0 +1,160 @@ +/* + +Copyright 2017 The Wallaroo Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied. See the License for the specific language governing + permissions and limitations under the License. + +*/ + +use "buffered" +use "collections" +use "serialise" +use "wallaroo_labs/bytes" +use "wallaroo" +use "wallaroo/core/common" +use "wallaroo_labs/mort" +use "wallaroo/core/sink/tcp_sink" +use "wallaroo/core/source" +use "wallaroo/core/source/tcp_source" +use "wallaroo/core/state" +use "wallaroo/core/topology" + +actor Main + new create(env: Env) => + try + let letter_partition = Partition[Votes val, String]( + LetterPartitionFunction, PartitionFileReader("letters.txt", + env.root as AmbientAuth)) + + let application = recover val + Application("Alphabet Popularity Contest") + .new_pipeline[Votes val, LetterTotal val]("Alphabet Votes", + TCPSourceConfig[Votes val].from_options(VotesDecoder, + TCPSourceConfigCLIParser(env.args)?(0)?)) + .to_state_partition[Votes val, String, LetterTotal val, + LetterState](AddVotes, LetterStateBuilder, "letter-state", + letter_partition where multi_worker = true) + .to_sink(TCPSinkConfig[LetterTotal val].from_options( + LetterTotalEncoder, + TCPSinkConfigCLIParser(env.args)?(0)?)) + end + Startup(env, application, "alphabet-contest") + else + @printf[I32]("Couldn't build topology\n".cstring()) + end + +class val LetterStateBuilder + fun apply(): LetterState => LetterState + +class LetterState is State + var letter: String = " " + var count: U64 = 0 + +class AddVotesStateChange is StateChange[LetterState] + var _id: U64 + var _votes: Votes val = Votes(" ", 0) + + new create(id': U64) => + _id = id' + + fun name(): String => "AddVotes" + fun id(): U64 => _id + + fun ref update(votes': Votes val) => + _votes = votes' + + fun apply(state: LetterState ref) => + state.letter = _votes.letter + state.count = state.count + _votes.count + + fun write_log_entry(out_writer: Writer) => + out_writer.u32_be(_votes.letter.size().u32()) + out_writer.write(_votes.letter) + out_writer.u64_be(_votes.count) + + fun ref read_log_entry(in_reader: Reader) ? => + let letter_size = in_reader.u32_be()?.usize() + let letter = String.from_array(in_reader.block(letter_size)?) + let count = in_reader.u64_be()? + _votes = Votes(letter, count) + +class AddVotesStateChangeBuilder is StateChangeBuilder[LetterState] + fun apply(id: U64): StateChange[LetterState] => + AddVotesStateChange(id) + +primitive AddVotes is StateComputation[Votes val, LetterTotal val, LetterState] + fun name(): String => "Add Votes" + + fun apply(votes: Votes val, + sc_repo: StateChangeRepository[LetterState], + state: LetterState): (LetterTotal val, StateChange[LetterState] ref) + => + let state_change: AddVotesStateChange ref = + try + sc_repo.lookup_by_name("AddVotes")? as AddVotesStateChange + else + AddVotesStateChange(0) + end + + state_change.update(votes) + + (LetterTotal(votes.letter, votes.count + state.count), state_change) + + fun state_change_builders(): + Array[StateChangeBuilder[LetterState]] val + => + recover val + let scbs = Array[StateChangeBuilder[LetterState]] + scbs.push(recover val AddVotesStateChangeBuilder end) + scbs + end + +primitive VotesDecoder is FramedSourceHandler[Votes val] + fun header_length(): USize => + 4 + + fun payload_length(data: Array[U8] iso): USize => + 5 + + fun decode(data: Array[U8] val): Votes val ? => + // Assumption: 1 byte for letter + let letter = String.from_array(data.trim(0, 1)) + let count = Bytes.to_u32(data(1)?, data(2)?, data(3)?, data(4)?) + Votes(letter, count.u64()) + +primitive LetterPartitionFunction + fun apply(votes: Votes val): String => + votes.letter + +class Votes + let letter: String + let count: U64 + + new val create(l: String, c: U64) => + letter = l + count = c + +class LetterTotal + let letter: String + let count: U64 + + new val create(l: String, c: U64) => + letter = l + count = c + +primitive LetterTotalEncoder + fun apply(t: LetterTotal val, wb: Writer = Writer): Array[ByteSeq] val => + wb.u32_be(9) + wb.write(t.letter) // Assumption: letter is 1 byte + wb.u64_be(t.count) + wb.done() diff --git a/examples/pony/talphabet/bundle.json b/examples/pony/talphabet/bundle.json new file mode 100644 index 0000000000..da7ad7d47e --- /dev/null +++ b/examples/pony/talphabet/bundle.json @@ -0,0 +1,7 @@ +{ + "deps": [ + { "type": "local", + "local-path": "../../../lib/" + } + ] +} diff --git a/examples/pony/talphabet/data_gen/.gitignore b/examples/pony/talphabet/data_gen/.gitignore new file mode 100644 index 0000000000..ce2250a79a --- /dev/null +++ b/examples/pony/talphabet/data_gen/.gitignore @@ -0,0 +1,2 @@ +data_gen +votes.msg diff --git a/examples/pony/talphabet/data_gen/Makefile b/examples/pony/talphabet/data_gen/Makefile new file mode 100644 index 0000000000..bf75bd4832 --- /dev/null +++ b/examples/pony/talphabet/data_gen/Makefile @@ -0,0 +1,42 @@ +# include root makefile +ifndef ROOT_MAKEFILE_MK +include ../../../../Makefile +endif + +# prevent rules from being evaluated/included multiple times +ifndef $(abspath $(lastword $(MAKEFILE_LIST)))_MK +$(abspath $(lastword $(MAKEFILE_LIST)))_MK := 1 + + +# The following are control variables that determine what logic from `rules.mk` is enabled + +# `true`/`false` to enable/disable the actual unit test command so it can be overridden (the targets are still created) +# applies to both the pony and elixir test targets +$(abspath $(lastword $(MAKEFILE_LIST)))_UNIT_TEST_COMMAND := false + +# `true`/`false` to enable/disable generate pony related targets (build/test/clean) for pony sources in this directory +# otherwise targets only get created if there are pony sources (*.pony) in this directory. +$(abspath $(lastword $(MAKEFILE_LIST)))_PONY_TARGET := true + +# `true`/`false` to enable/disable generate final file build target using ponyc command for the pony build target so +# it can be overridden manually +$(abspath $(lastword $(MAKEFILE_LIST)))_PONYC_TARGET := true + +# `true`/`false` to enable/disable generate exs related targets (build/test/clean) for elixir sources in this directory +# otherwise targets only get created if there are elixir sources (*.exs) in this directory. +$(abspath $(lastword $(MAKEFILE_LIST)))_EXS_TARGET := true + +# `true`/`false` to enable/disable generate docker related targets (build/push) for a Dockerfile in this directory +# otherwise targets only get created if there is a Dockerfile in this directory +$(abspath $(lastword $(MAKEFILE_LIST)))_DOCKER_TARGET := true + +# `true`/`false` to enable/disable recursing into Makefiles of subdirectories if they exist +# (and by recursion every makefile in the tree that is referenced) +$(abspath $(lastword $(MAKEFILE_LIST)))_RECURSE_SUBMAKEFILES := true + + +# standard rules generation makefile +include $(rules_mk_path) + +# end of prevent rules from being evaluated/included multiple times +endif diff --git a/examples/pony/talphabet/data_gen/bundle.json b/examples/pony/talphabet/data_gen/bundle.json new file mode 100644 index 0000000000..b069679cb3 --- /dev/null +++ b/examples/pony/talphabet/data_gen/bundle.json @@ -0,0 +1,7 @@ +{ + "deps": [ + { "type": "local", + "local-path": "../../../../lib/" + } + ] +} diff --git a/examples/pony/talphabet/data_gen/gen.pony b/examples/pony/talphabet/data_gen/gen.pony new file mode 100644 index 0000000000..e979b76bf9 --- /dev/null +++ b/examples/pony/talphabet/data_gen/gen.pony @@ -0,0 +1,84 @@ +/* + +Copyright 2017 The Wallaroo Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied. See the License for the specific language governing + permissions and limitations under the License. + +*/ + +use "collections" +use "random" +use "time" +use "buffered" +use "files" + +use "wallaroo_labs/options" + +actor Main + let letters: Array[String] = + "a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v,w,x,y,z".split(",") + + new create(env: Env) => + let options = Options(env.args) + var file_path = "votes.msg" + var message_count: I32 = 0 + + options + .add("output", "o", StringArgument) + .add("message-count", "m", I64Argument) + + for option in options do + match option + | ("output", let arg: String) => file_path = arg + | ("message-count", let arg: I64) => message_count = arg.i32() + end + end + + try + let auth = env.root as AmbientAuth + + let wb: Writer = Writer + + let rand = MT(Time.nanos()) + + let file = File(FilePath(auth, file_path)?) + + if message_count == 0 then + @printf[I32](("Please specify a message count " + + "(--message-count/-m)\n").cstring()) + error + end + + for idx in Range[I32](0, message_count) do + let next_vote = rand.int(100).u32() + let next_letter = try + letters(rand.int(letters.size().u64()).usize())? + else + "." + end + + file.writev(VoteEncoder(next_letter, next_vote, wb)) + end + + file.dispose() + end + +primitive VoteEncoder + fun apply(letter: String, count: U32, wb: Writer = Writer): Array[ByteSeq] val + => + // Header + wb.u32_be(5) + // Fields + wb.write(letter) + wb.u32_be(count) + wb.done() diff --git a/examples/pony/talphabet/letters.txt b/examples/pony/talphabet/letters.txt new file mode 100644 index 0000000000..d68dd4031d --- /dev/null +++ b/examples/pony/talphabet/letters.txt @@ -0,0 +1,4 @@ +a +b +c +d diff --git a/examples/pony/talphabet/old_letters.txt b/examples/pony/talphabet/old_letters.txt new file mode 100644 index 0000000000..694fe82a28 --- /dev/null +++ b/examples/pony/talphabet/old_letters.txt @@ -0,0 +1,19 @@ +a +b +c +d +e +f +g +h +i +j +k +l +m +n +o +p +q +r +s diff --git a/examples/pony/talphabet/talphabet b/examples/pony/talphabet/talphabet new file mode 100755 index 0000000000..fc6eafef57 Binary files /dev/null and b/examples/pony/talphabet/talphabet differ diff --git a/lib/wallaroo/core/boundary/boundary.pony b/lib/wallaroo/core/boundary/boundary.pony index 958e292b83..ebb3595921 100644 --- a/lib/wallaroo/core/boundary/boundary.pony +++ b/lib/wallaroo/core/boundary/boundary.pony @@ -151,6 +151,9 @@ actor OutgoingBoundary is Consumer var _reconnect_pause: U64 = 10_000_000_000 let _timers: Timers = Timers + //!@ + var _finished_ack_status: Bool = false + new create(auth: AmbientAuth, worker_name: String, metrics_reporter: MetricsReporter iso, host: String, service: String, from: String = "", init_size: USize = 64, max_size: USize = 16384, @@ -446,10 +449,22 @@ actor OutgoingBoundary is Consumer _upstreams.unset(producer) + //!@ + be report_status(code: ReportStatusCode) => + if not _finished_ack_status then + _finished_ack_status = true + _finished_ack_waiter.report_status(code) + try + _writev(ChannelMsgEncoder.report_status(code, _auth)?) + else + Fail() + end + end + be request_finished_ack(upstream_request_id: RequestId, requester_id: StepId, upstream_requester: FinishedAckRequester) => - @printf[I32]("!@ request_finished_ack BOUNDARY %s, requester_id: %s, upstream_request_id: %s\n".cstring(), _step_id.string().cstring(), requester_id.string().cstring(), upstream_request_id.string().cstring()) + // @printf[I32]("!@ request_finished_ack BOUNDARY %s, requester_id: %s, upstream_request_id: %s\n".cstring(), _step_id.string().cstring(), requester_id.string().cstring(), upstream_request_id.string().cstring()) // !@ // if not _finished_ack_waiter.pending_request() then @@ -472,6 +487,7 @@ actor OutgoingBoundary is Consumer producer: FinishedAckRequester) => // @printf[I32]("!@ request_finished_ack_complete BOUNDARY\n".cstring()) + _finished_ack_waiter.clear() try _writev(ChannelMsgEncoder.request_finished_ack_complete(_worker_name, requester_id, _auth)?) @@ -479,11 +495,14 @@ actor OutgoingBoundary is Consumer Fail() end + //!@ + _finished_ack_status = false + be try_finish_request_early(requester_id: StepId) => _finished_ack_waiter.try_finish_request_early(requester_id) be receive_finished_ack(request_id: RequestId) => - // @printf[I32]("!@ receive_finished_ack BOUNDARY\n".cstring()) + // @printf[I32]("!@ receive_finished_ack BOUNDARY %s\n".cstring(), _step_id.string().cstring()) _finished_ack_waiter.unmark_consumer_request(request_id) // diff --git a/lib/wallaroo/core/common/dummy_consumer.pony b/lib/wallaroo/core/common/dummy_consumer.pony index 5d2886bbbb..518962f0ac 100644 --- a/lib/wallaroo/core/common/dummy_consumer.pony +++ b/lib/wallaroo/core/common/dummy_consumer.pony @@ -31,6 +31,10 @@ actor DummyConsumer is Consumer be unregister_producer(producer: Producer) => None + //!@ + be report_status(code: ReportStatusCode) => + None + be request_finished_ack(request_id: RequestId, requester_id: StepId, producer: FinishedAckRequester) => diff --git a/lib/wallaroo/core/common/finished_ack.pony b/lib/wallaroo/core/common/finished_ack.pony index e34ccd480a..a2ead0ace6 100644 --- a/lib/wallaroo/core/common/finished_ack.pony +++ b/lib/wallaroo/core/common/finished_ack.pony @@ -88,7 +88,7 @@ class FinishedAckWaiter match upstream_requester' | let far: FinishedAckRequester => far else - InitialFinishedAckRequester(_step_id) + EmptyFinishedAckRequester end // If _upstream_request_ids contains the requester_id, then we're @@ -147,16 +147,21 @@ class FinishedAckWaiter fun ref unmark_consumer_request(request_id: RequestId) => try + // @printf[I32]("!@ unmark_consumer_request 1\n".cstring()) let requester_id = _downstream_request_ids(request_id)? // @printf[I32]("!@ received ack for request_id %s (associated with requester %s). (reported from %s)\n".cstring(), request_id.string().cstring(), requester_id.string().cstring(), _step_id.string().cstring()) + // @printf[I32]("!@ unmark_consumer_request 2\n".cstring()) let id_set = _pending_acks(requester_id)? ifdef debug then Invariant(id_set.contains(request_id)) end id_set.unset(request_id) + // @printf[I32]("!@ unmark_consumer_request 3\n".cstring()) _downstream_request_ids.remove(request_id)? + // @printf[I32]("!@ unmark_consumer_request COMPLETE\n".cstring()) _check_send_run(requester_id) else + @printf[I32]("!@ About to fail on %s\n".cstring(), _step_id.string().cstring()) Fail() end @@ -165,13 +170,37 @@ class FinishedAckWaiter _check_send_run(requester_id) fun ref clear() => + // @printf[I32]("!@ finished_ack CLEAR on %s\n".cstring(), _step_id.string().cstring()) _pending_acks.clear() + _downstream_request_ids.clear() _upstream_request_ids.clear() _upstream_requesters.clear() + _custom_actions.clear() + + //!@ + fun report_status(code: ReportStatusCode) => + match code + | FinishedAcksStatus => + var pending: USize = 0 + var requester_id: StepId = 0 + for (r_id, pa) in _pending_acks.pairs() do + if pa.size() > 0 then + requester_id = r_id + pending = pending + 1 + end + end + // @printf[I32]("!@ waiting at %s on %s pending ack groups, for requester ids:\n".cstring(), _step_id.string().cstring(), pending.string().cstring()) + // if pending == 1 then + // @printf[I32]("!@ %s waiting for one for requester id %s\n".cstring(), _step_id.string().cstring(), requester_id.string().cstring()) + // end + // for p in _pending_acks.keys() do + // @printf[I32]("!@ %s (from %s)\n".cstring(), p.string().cstring(), _step_id.string().cstring()) + // end + end fun ref _check_send_run(requester_id: StepId) => try - @printf[I32]("!@ _pending_acks size: %s for requester_id %s (reported from %s). Listing pending acks:\n".cstring(), _pending_acks(requester_id)?.size().string().cstring(), requester_id.string().cstring(), _step_id.string().cstring()) + // @printf[I32]("!@ _pending_acks size: %s for requester_id %s (reported from %s). Listing pending acks:\n".cstring(), _pending_acks(requester_id)?.size().string().cstring(), requester_id.string().cstring(), _step_id.string().cstring()) // for pending_ack in _pending_acks(requester_id)?.values() do // @printf[I32]("!@ -- %s\n".cstring(), pending_ack.string().cstring()) // end diff --git a/lib/wallaroo/core/common/producer_consumer.pony b/lib/wallaroo/core/common/producer_consumer.pony index 8c8e9a7be3..c04814a9d2 100644 --- a/lib/wallaroo/core/common/producer_consumer.pony +++ b/lib/wallaroo/core/common/producer_consumer.pony @@ -28,11 +28,15 @@ trait tag FinishedAckRequester be try_finish_request_early(requester_id: StepId) trait tag FinishedAckResponder + //!@ be request_finished_ack(request_id: RequestId, requester_id: StepId, requester: FinishedAckRequester) be request_finished_ack_complete(requester_id: StepId, requester: FinishedAckRequester) +trait tag StatusReporter + be report_status(code: ReportStatusCode) + trait tag Producer is (Muteable & Ackable & AckRequester & FinishedAckRequester) fun ref route_to(c: Consumer): (Route | None) @@ -46,7 +50,7 @@ interface tag BoundaryUpdateable be remove_boundary(worker: String) trait tag Consumer is (Runnable & StateReceiver & AckRequester & - Initializable & FinishedAckResponder) + Initializable & FinishedAckResponder & StatusReporter) be register_producer(producer: Producer) be unregister_producer(producer: Producer) diff --git a/lib/wallaroo/core/common/report_status_codes.pony b/lib/wallaroo/core/common/report_status_codes.pony new file mode 100644 index 0000000000..c056fca2a3 --- /dev/null +++ b/lib/wallaroo/core/common/report_status_codes.pony @@ -0,0 +1,13 @@ + +trait val ReportStatusCode +primitive FinishedAcksStatus is ReportStatusCode +primitive BoundaryCountStatus is ReportStatusCode + +primitive ReportStatusCodeParser + fun apply(s: String): ReportStatusCode ? => + match s + | "finished-acks-status" => FinishedAcksStatus + | "boundary-count-status" => BoundaryCountStatus + else + error + end diff --git a/lib/wallaroo/core/data_channel/data_channel_tcp.pony b/lib/wallaroo/core/data_channel/data_channel_tcp.pony index f102c7016a..6051940b54 100644 --- a/lib/wallaroo/core/data_channel/data_channel_tcp.pony +++ b/lib/wallaroo/core/data_channel/data_channel_tcp.pony @@ -274,6 +274,9 @@ class DataChannelConnectNotifier is DataChannelNotify m.sender.cstring()) end _receiver.request_finished_ack(m.request_id, m.requester_id) + //!@ + | let m: ReportStatusMsg => + _receiver.report_status(m.code) | let m: RequestFinishedAckCompleteMsg => ifdef "trace" then @printf[I32]("Received RequestFinishedAckCompleteMsg from %s\n" @@ -316,6 +319,9 @@ trait _DataReceiverWrapper latest_ts: U64, metrics_id: U16, worker_ingress_ts: U64) fun replay_received(r: ReplayableDeliveryMsg, pipeline_time_spent: U64, seq_id: U64, latest_ts: U64, metrics_id: U16, worker_ingress_ts: U64) + //!@ + fun report_status(code: ReportStatusCode) + fun request_finished_ack(request_id: RequestId, requester_id: StepId) fun request_finished_ack_complete(requester_id: StepId) @@ -336,6 +342,10 @@ class _InitDataReceiver is _DataReceiverWrapper fun upstream_replay_finished() => Fail() + //!@ + fun report_status(code: ReportStatusCode) => + Fail() + fun request_finished_ack(request_id: RequestId, requester_id: StepId) => Fail() @@ -364,6 +374,10 @@ class _DataReceiver is _DataReceiverWrapper data_receiver.replay_received(r, pipeline_time_spent, seq_id, latest_ts, metrics_id, worker_ingress_ts) + //!@ + fun report_status(code: ReportStatusCode) => + data_receiver.report_status(code) + fun request_finished_ack(request_id: RequestId, requester_id: StepId) => data_receiver.request_finished_ack(request_id, requester_id) diff --git a/lib/wallaroo/core/initialization/local_topology.pony b/lib/wallaroo/core/initialization/local_topology.pony index 4b082f33a9..d57c33a65b 100644 --- a/lib/wallaroo/core/initialization/local_topology.pony +++ b/lib/wallaroo/core/initialization/local_topology.pony @@ -281,6 +281,7 @@ actor LocalTopologyInitializer is LayoutInitializer _worker_names_file = worker_names_file _cluster_manager = cluster_manager _is_joining = is_joining + _router_registry.register_local_topology_initializer(this) be update_topology(t: LocalTopology) => _topology = t @@ -439,6 +440,21 @@ actor LocalTopologyInitializer is LayoutInitializer _outgoing_boundary_builders = consume bbs _initializables.set(boundary) + be remove_boundary(leaving_worker: String) => + // Boundaries + let bs = recover trn Map[String, OutgoingBoundary] end + for (w, b) in _outgoing_boundaries.pairs() do + if w != leaving_worker then bs(w) = b end + end + + // Boundary builders + let bbs = recover trn Map[String, OutgoingBoundaryBuilder] end + for (w, b) in _outgoing_boundary_builders.pairs() do + if w != leaving_worker then bbs(w) = b end + end + _outgoing_boundaries = consume bs + _outgoing_boundary_builders = consume bbs + be update_boundaries(bs: Map[String, OutgoingBoundary] val, bbs: Map[String, OutgoingBoundaryBuilder] val) => @@ -1505,12 +1521,8 @@ actor LocalTopologyInitializer is LayoutInitializer let runner_builder = subpartition.runner_builder() let reporter = MetricsReporter(t.name(), t.worker_name(), _metrics_conn) - let step = Step(_auth, runner_builder(where event_log = _event_log, - auth = _auth), consume reporter, msg.step_id(), - runner_builder.route_builder(), _event_log, _recovery_replayer, - _outgoing_boundaries) - step.receive_state(msg.state()) - msg.update_router_registry(_router_registry, step) + _router_registry.receive_immigrant_step(subpartition, runner_builder, + consume reporter, _recovery_replayer, msg) else Fail() end @@ -1549,6 +1561,20 @@ actor LocalTopologyInitializer is LayoutInitializer Fail() end + be source_ids_query(conn: TCPConnection) => + _router_registry.source_ids_query(conn) + + //!@ + be report_status(code: ReportStatusCode) => + match code + | FinishedAcksStatus => + @printf[I32]("!@ LocalTopologyInitializer finished_ack_status\n".cstring()) + | BoundaryCountStatus => + @printf[I32]("LocalTopologyInitializer knows about %s boundaries\n" + .cstring(), _outgoing_boundaries.size().string().cstring()) + end + _router_registry.report_status(code) + be initialize_join_initializables() => _initialize_join_initializables() diff --git a/lib/wallaroo/core/messages/channel_messages.pony b/lib/wallaroo/core/messages/channel_messages.pony index 416d2f0e5c..92a091b73f 100644 --- a/lib/wallaroo/core/messages/channel_messages.pony +++ b/lib/wallaroo/core/messages/channel_messages.pony @@ -313,6 +313,12 @@ primitive ChannelMsgEncoder => _encode(CleanShutdownMsg(msg), auth)? + //!@ + fun report_status(code: ReportStatusCode, auth: AmbientAuth): + Array[ByteSeq] val ? + => + _encode(ReportStatusMsg(code), auth)? + fun request_finished_ack(sender: String, request_id: RequestId, requester_id: StepId, auth: AmbientAuth): Array[ByteSeq] val ? => @@ -465,7 +471,8 @@ trait val StepMigrationMsg is ChannelMsg fun step_id(): U128 fun state(): ByteSeq val fun worker(): String - fun update_router_registry(router_registry: RouterRegistry, target: Consumer) + fun update_router_registry(router_registry: RouterRegistry ref, + target: Consumer) class val KeyedStepMigrationMsg[K: (Hashable val & Equatable[K] val)] is StepMigrationMsg @@ -488,7 +495,8 @@ class val KeyedStepMigrationMsg[K: (Hashable val & Equatable[K] val)] is fun step_id(): U128 => _step_id fun state(): ByteSeq val => _state fun worker(): String => _worker - fun update_router_registry(router_registry: RouterRegistry, target: Consumer) + fun update_router_registry(router_registry: RouterRegistry ref, + target: Consumer) => router_registry.move_proxy_to_stateful_step[K](_step_id, target, _key, _state_name, _worker) @@ -824,3 +832,10 @@ class val RequestFinishedAckCompleteMsg is ChannelMsg => sender = sender' requester_id = requester_id' + +//!@ +class val ReportStatusMsg is ChannelMsg + let code: ReportStatusCode + + new val create(c: ReportStatusCode) => + code = c diff --git a/lib/wallaroo/core/routing/boundary_route.pony b/lib/wallaroo/core/routing/boundary_route.pony index 88a56e337c..483e2529c0 100644 --- a/lib/wallaroo/core/routing/boundary_route.pony +++ b/lib/wallaroo/core/routing/boundary_route.pony @@ -113,6 +113,10 @@ class BoundaryRoute is Route fun ref request_ack() => _consumer.request_ack() + //!@ + fun ref report_status(code: ReportStatusCode) => + _consumer.report_status(code) + fun ref request_finished_ack(request_id: RequestId, requester_id: StepId, producer: FinishedAckRequester) => diff --git a/lib/wallaroo/core/routing/route.pony b/lib/wallaroo/core/routing/route.pony index e78e7e896f..2b2f59e133 100644 --- a/lib/wallaroo/core/routing/route.pony +++ b/lib/wallaroo/core/routing/route.pony @@ -41,6 +41,9 @@ trait Route metric_name: String, worker_ingress_ts: U64) fun ref request_ack() + + //!@ + fun ref report_status(code: ReportStatusCode) fun ref request_finished_ack(request_id: RequestId, requester_id: StepId, requester: FinishedAckRequester) fun ref request_finished_ack_complete(requester_id: StepId, @@ -115,6 +118,10 @@ class EmptyRoute is Route Fail() true + //!@ + fun ref report_status(code: ReportStatusCode) => + Fail() + fun ref request_finished_ack(request_id: RequestId, requester_id: StepId, producer: FinishedAckRequester) => diff --git a/lib/wallaroo/core/routing/typed_route.pony b/lib/wallaroo/core/routing/typed_route.pony index dc0cc99707..7f7ae95085 100644 --- a/lib/wallaroo/core/routing/typed_route.pony +++ b/lib/wallaroo/core/routing/typed_route.pony @@ -138,6 +138,10 @@ class TypedRoute[In: Any val] is Route fun ref request_ack() => _consumer.request_ack() + //!@ + fun ref report_status(code: ReportStatusCode) => + _consumer.report_status(code) + fun ref request_finished_ack(request_id: RequestId, requester_id: StepId, requester: FinishedAckRequester) => diff --git a/lib/wallaroo/core/sink/empty_sink.pony b/lib/wallaroo/core/sink/empty_sink.pony index 3c25fcd424..24520091a4 100644 --- a/lib/wallaroo/core/sink/empty_sink.pony +++ b/lib/wallaroo/core/sink/empty_sink.pony @@ -63,6 +63,10 @@ actor EmptySink is Consumer be unregister_producer(producer: Producer) => None + //!@ + be report_status(code: ReportStatusCode) => + None + be request_finished_ack(request_id: RequestId, requester_id: StepId, producer: FinishedAckRequester) => diff --git a/lib/wallaroo/core/sink/kafka_sink/kafka_sink.pony b/lib/wallaroo/core/sink/kafka_sink/kafka_sink.pony index d910af90d9..766556369b 100644 --- a/lib/wallaroo/core/sink/kafka_sink/kafka_sink.pony +++ b/lib/wallaroo/core/sink/kafka_sink/kafka_sink.pony @@ -242,6 +242,13 @@ actor KafkaSink is (Consumer & KafkaClientManager & KafkaProducer) _upstreams.unset(producer) + //!@ + be report_status(code: ReportStatusCode) => + match code + | FinishedAcksStatus => + @printf[I32]("!@ Kafka sink finished ack status\n".cstring()) + end + be request_finished_ack(request_id: RequestId, requester_id: StepId, requester: FinishedAckRequester) => diff --git a/lib/wallaroo/core/sink/tcp_sink/tcp_sink.pony b/lib/wallaroo/core/sink/tcp_sink/tcp_sink.pony index de2460caf3..4197cbd0e9 100644 --- a/lib/wallaroo/core/sink/tcp_sink/tcp_sink.pony +++ b/lib/wallaroo/core/sink/tcp_sink/tcp_sink.pony @@ -299,10 +299,17 @@ actor TCPSink is Consumer _upstreams.unset(producer) + //!@ + be report_status(code: ReportStatusCode) => + match code + | FinishedAcksStatus => + @printf[I32]("!@ tcp sink finished_ack_status\n".cstring()) + end + be request_finished_ack(request_id: RequestId, requester_id: StepId, requester: FinishedAckRequester) => - @printf[I32]("!@ request_finished_ack TCPSink, upstream_request_id: %s, requester_id: %s\n".cstring(), request_id.string().cstring(), requester_id.string().cstring()) + // @printf[I32]("!@ request_finished_ack TCPSink, upstream_request_id: %s, requester_id: %s\n".cstring(), request_id.string().cstring(), requester_id.string().cstring()) requester.receive_finished_ack(request_id) be request_finished_ack_complete(requester_id: StepId, diff --git a/lib/wallaroo/core/source/kafka_source/kafka_source.pony b/lib/wallaroo/core/source/kafka_source/kafka_source.pony index dc84607724..ab0256de91 100644 --- a/lib/wallaroo/core/source/kafka_source/kafka_source.pony +++ b/lib/wallaroo/core/source/kafka_source/kafka_source.pony @@ -30,7 +30,7 @@ use "wallaroo/core/routing" use "wallaroo/core/topology" actor KafkaSource[In: Any val] is (Producer & FinishedAckResponder & - KafkaConsumer) + StatusReporter & KafkaConsumer) let _source_id: StepId let _step_id_gen: StepIdGenerator = StepIdGenerator let _routes: MapIs[Consumer, Route] = _routes.create() @@ -58,7 +58,7 @@ actor KafkaSource[In: Any val] is (Producer & FinishedAckResponder & let _partition_id: I32 let _kc: KafkaClient tag - new create(listen: KafkaSourceListener[In], + new create(source_id: StepId, listen: KafkaSourceListener[In], notify: KafkaSourceNotify[In] iso, routes: Array[Consumer] val, route_builder: RouteBuilder, outgoing_boundary_builders: Map[String, OutgoingBoundaryBuilder] val, @@ -67,6 +67,7 @@ actor KafkaSource[In: Any val] is (Producer & FinishedAckResponder & topic: String, partition_id: I32, kafka_client: KafkaClient tag, router_registry: RouterRegistry) => + _source_id = source_id _topic = topic _partition_id = partition_id _kc = kafka_client @@ -80,7 +81,6 @@ actor KafkaSource[In: Any val] is (Producer & FinishedAckResponder & _route_builder = route_builder - _source_id = _step_id_gen() _finished_ack_waiter = FinishedAckWaiter(_source_id) for (target_worker_name, builder) in outgoing_boundary_builders.pairs() do @@ -223,6 +223,16 @@ actor KafkaSource[In: Any val] is (Producer & FinishedAckResponder & fun ref current_sequence_id(): SeqId => _seq_id + //!@ + be report_status(code: ReportStatusCode) => + match code + | FinishedAcksStatus => + _finished_ack_waiter.report_status(code) + end + for route in _routes.values() do + route.report_status(code) + end + be request_finished_ack(upstream_request_id: RequestId, requester_id: StepId, requester: FinishedAckRequester) => diff --git a/lib/wallaroo/core/source/kafka_source/kafka_source_listener.pony b/lib/wallaroo/core/source/kafka_source/kafka_source_listener.pony index 8c2f23229f..09020d8b5a 100644 --- a/lib/wallaroo/core/source/kafka_source/kafka_source_listener.pony +++ b/lib/wallaroo/core/source/kafka_source/kafka_source_listener.pony @@ -53,6 +53,7 @@ class val KafkaSourceListenerBuilderBuilder[In: Any val] _auth) class val KafkaSourceListenerBuilder[In: Any val] + let _step_id_gen: StepIdGenerator = StepIdGenerator let _source_builder: SourceBuilder let _router: Router let _router_registry: RouterRegistry @@ -205,12 +206,14 @@ actor KafkaSourceListener[In: Any val] is (SourceListener & KafkaClientManager) try match _kc | let kc: KafkaClient tag => - let source = KafkaSource[In](this, _notify.build_source(_env)?, - _router.routes(), _route_builder, _outgoing_boundary_builders, - _layout_initializer, _metrics_reporter.clone(), topic, part_id, - kc, _router_registry) + let source_id = _step_id_gen() + let source = KafkaSource[In](source_id, this, + _notify.build_source(_env)?, _router.routes(), _route_builder, + _outgoing_boundary_builders, _layout_initializer, + _metrics_reporter.clone(), topic, part_id, kc, + _router_registry) partitions_sources(part_id) = source - _router_registry.register_source(source) + _router_registry.register_source(source, source_id) match _router | let pr: PartitionRouter => _router_registry.register_partition_router_subscriber(pr.state_name(), source) diff --git a/lib/wallaroo/core/source/source.pony b/lib/wallaroo/core/source/source.pony index 8968dc11ff..ffbb61036d 100644 --- a/lib/wallaroo/core/source/source.pony +++ b/lib/wallaroo/core/source/source.pony @@ -93,7 +93,7 @@ interface val SourceConfig[In: Any val] SourceBuilderBuilder interface tag Source is (DisposableActor & BoundaryUpdateable & - FinishedAckResponder) + FinishedAckResponder & StatusReporter) be update_router(router: PartitionRouter) be add_boundary_builders( boundary_builders: Map[String, OutgoingBoundaryBuilder] val) diff --git a/lib/wallaroo/core/source/tcp_source/tcp_source.pony b/lib/wallaroo/core/source/tcp_source/tcp_source.pony index 9407c2a5c5..b8d2679587 100644 --- a/lib/wallaroo/core/source/tcp_source/tcp_source.pony +++ b/lib/wallaroo/core/source/tcp_source/tcp_source.pony @@ -51,7 +51,7 @@ use @pony_asio_event_resubscribe_read[None](event: AsioEventID) use @pony_asio_event_resubscribe_write[None](event: AsioEventID) use @pony_asio_event_destroy[None](event: AsioEventID) -actor TCPSource is (Producer & FinishedAckResponder) +actor TCPSource is (Producer & FinishedAckResponder & StatusReporter) """ # TCPSource @@ -97,8 +97,9 @@ actor TCPSource is (Producer & FinishedAckResponder) var _seq_id: SeqId = 1 // 0 is reserved for "not seen yet" var _finished_ack_waiter: FinishedAckWaiter - new _accept(listen: TCPSourceListener, notify: TCPSourceNotify iso, - routes: Array[Consumer] val, route_builder: RouteBuilder, + new _accept(source_id: StepId, listen: TCPSourceListener, + notify: TCPSourceNotify iso, routes: Array[Consumer] val, + route_builder: RouteBuilder, outgoing_boundary_builders: Map[String, OutgoingBoundaryBuilder] val, layout_initializer: LayoutInitializer, fd: U32, init_size: USize = 64, max_size: USize = 16384, @@ -107,7 +108,7 @@ actor TCPSource is (Producer & FinishedAckResponder) """ A new connection accepted on a server. """ - _source_id = _step_id_gen() + _source_id = source_id _finished_ack_waiter = FinishedAckWaiter(_source_id) _metrics_reporter = consume metrics_reporter _listen = listen @@ -284,23 +285,46 @@ actor TCPSource is (Producer & FinishedAckResponder) fun ref current_sequence_id(): SeqId => _seq_id + //!@ + be report_status(code: ReportStatusCode) => + @printf[I32]("!@ Source finished_ack_status\n".cstring()) + match code + | FinishedAcksStatus => + _finished_ack_waiter.report_status(code) + | BoundaryCountStatus => + var b_count: USize = 0 + for r in _routes.values() do + match r + | let br: BoundaryRoute => b_count = b_count + 1 + end + end + @printf[I32]("!@ Source %s has %s boundaries.\n".cstring(), _source_id.string().cstring(), b_count.string().cstring()) + end + for route in _routes.values() do + route.report_status(code) + end + be request_finished_ack(upstream_request_id: RequestId, requester_id: StepId, - requester: FinishedAckRequester) + upstream_requester: FinishedAckRequester) => @printf[I32]("!@ Source stopping world (%s)\n".cstring(), _source_id.string().cstring()) - _finished_ack_waiter.add_new_request(requester_id, upstream_request_id, - requester) - - if _routes.size() > 0 then - for route in _routes.values() do - @printf[I32]("!@ ---*****---- Add consumer request at Source\n".cstring()) - let request_id = _finished_ack_waiter.add_consumer_request( - requester_id) - route.request_finished_ack(request_id, _source_id, this) + + if not _finished_ack_waiter.already_added_request(requester_id) then + _finished_ack_waiter.add_new_request(requester_id, upstream_request_id, + upstream_requester) + if _routes.size() > 0 then + for route in _routes.values() do + @printf[I32]("!@ ---*****---- Add consumer request at Source\n".cstring()) + let request_id = _finished_ack_waiter.add_consumer_request( + requester_id) + route.request_finished_ack(request_id, _source_id, this) + end + else + upstream_requester.try_finish_request_early(requester_id) end else - requester.try_finish_request_early(requester_id) + upstream_requester.receive_finished_ack(upstream_request_id) end be request_finished_ack_complete(requester_id: StepId, @@ -317,7 +341,7 @@ actor TCPSource is (Producer & FinishedAckResponder) _finished_ack_waiter.try_finish_request_early(requester_id) be receive_finished_ack(request_id: RequestId) => - // @printf[I32]("!@ receive_finished_ack RECEIVE TCPSource\n".cstring()) + // @printf[I32]("!@ receive_finished_ack RECEIVE TCPSource %s\n".cstring(), _source_id.string().cstring()) _finished_ack_waiter.unmark_consumer_request(request_id) // diff --git a/lib/wallaroo/core/source/tcp_source/tcp_source_listener.pony b/lib/wallaroo/core/source/tcp_source/tcp_source_listener.pony index be39dd6369..b53c297a1f 100644 --- a/lib/wallaroo/core/source/tcp_source/tcp_source_listener.pony +++ b/lib/wallaroo/core/source/tcp_source/tcp_source_listener.pony @@ -45,7 +45,7 @@ actor TCPSourceListener is SourceListener """ # TCPSourceListener """ - + let _step_id_gen: StepIdGenerator = StepIdGenerator let _env: Env var _router: Router let _router_registry: RouterRegistry @@ -206,14 +206,15 @@ actor TCPSourceListener is SourceListener Spawn a new connection. """ try - let source = TCPSource._accept(this, _notify_connected()?, + let source_id = _step_id_gen() + let source = TCPSource._accept(source_id, this, _notify_connected()?, _router.routes(), _route_builder, _outgoing_boundary_builders, _layout_initializer, ns, _init_size, _max_size, _metrics_reporter.clone(), _router_registry) // TODO: We need to figure out how to unregister this when the // connection dies // @printf[I32]("!@ About to register source from listener!\n".cstring()) - _router_registry.register_source(source) + _router_registry.register_source(source, source_id) match _router | let pr: PartitionRouter => _router_registry.register_partition_router_subscriber(pr.state_name(), diff --git a/lib/wallaroo/core/topology/router.pony b/lib/wallaroo/core/topology/router.pony index 45975d1194..1f2bb320a3 100644 --- a/lib/wallaroo/core/topology/router.pony +++ b/lib/wallaroo/core/topology/router.pony @@ -306,6 +306,7 @@ trait val OmniRouter is Equatable[OmniRouter] latest_ts: U64, metrics_id: U16, worker_ingress_ts: U64): (Bool, U64) fun val add_boundary(w: String, boundary: OutgoingBoundary): OmniRouter + fun val remove_boundary(w: String): OmniRouter fun val update_route_to_proxy(id: U128, pa: ProxyAddress): OmniRouter @@ -340,6 +341,9 @@ class val EmptyOmniRouter is OmniRouter => this + fun val remove_boundary(w: String): OmniRouter => + this + fun val update_route_to_proxy(id: U128, pa: ProxyAddress): OmniRouter => @@ -545,6 +549,16 @@ class val StepIdRouter is OmniRouter StepIdRouter(_worker_name, _data_routes, _step_map, consume new_outgoing_boundaries, _stateless_partitions) + fun val remove_boundary(w: String): OmniRouter => + // TODO: Using persistent maps for our fields would make this more + // efficient + let new_outgoing_boundaries = recover trn Map[String, OutgoingBoundary] end + for (k, v) in _outgoing_boundaries.pairs() do + if k != w then new_outgoing_boundaries(k) = v end + end + StepIdRouter(_worker_name, _data_routes, _step_map, + consume new_outgoing_boundaries, _stateless_partitions) + fun val update_route_to_proxy(id: U128, pa: ProxyAddress): OmniRouter => // TODO: Using persistent maps for our fields would make this more // efficient @@ -881,6 +895,12 @@ class val DataRouter is Equatable[DataRouter] Fail() end + //!@ + fun report_status(code: ReportStatusCode) => + for consumer in _data_routes.values() do + consumer.report_status(code) + end + fun request_finished_ack(requester_id: StepId, requester: DataReceiver, finished_ack_waiter: FinishedAckWaiter) => diff --git a/lib/wallaroo/core/topology/steps.pony b/lib/wallaroo/core/topology/steps.pony index f16b1291cb..512ae7b26b 100644 --- a/lib/wallaroo/core/topology/steps.pony +++ b/lib/wallaroo/core/topology/steps.pony @@ -93,13 +93,34 @@ actor Step is (Producer & Consumer) _id = id _finished_ack_waiter = FinishedAckWaiter(_id) - for (state_name, boundary) in _outgoing_boundaries.pairs() do - _outgoing_boundaries(state_name) = boundary + for (worker, boundary) in _outgoing_boundaries.pairs() do + _outgoing_boundaries(worker) = boundary end _event_log.register_producer(this, id) let initial_router = _runner.clone_router_and_set_input_type(router) _update_router(initial_router) + + for consumer in _router.routes().values() do + if not _routes.contains(consumer) then + _routes(consumer) = + _route_builder(this, consumer, _metrics_reporter) + end + end + + for boundary in _outgoing_boundaries.values() do + if not _routes.contains(boundary) then + _routes(boundary) = + _route_builder(this, boundary, _metrics_reporter) + end + end + + for r in _routes.values() do + ifdef "resilience" then + _acker_x.add_route(r) + end + end + _step_message_processor = NormalStepMessageProcessor(this) // @@ -120,8 +141,10 @@ actor Step is (Producer & Consumer) end for boundary in _outgoing_boundaries.values() do - _routes(boundary) = - _route_builder(this, boundary, _metrics_reporter) + if not _routes.contains(boundary) then + _routes(boundary) = + _route_builder(this, boundary, _metrics_reporter) + end end for r in _routes.values() do @@ -131,6 +154,8 @@ actor Step is (Producer & Consumer) end end + // @printf[I32]("!@ step application_created routes: %s\n".cstring(), _routes.size().string().cstring()) + _omni_router = omni_router _initialized = true @@ -189,6 +214,8 @@ actor Step is (Producer & Consumer) end end + // @printf[I32]("!@ step register_routes routes: %s\n".cstring(), _routes.size().string().cstring()) + be update_router(router: Router) => _update_router(router) @@ -211,6 +238,8 @@ actor Step is (Producer & Consumer) Fail() end + // @printf[I32]("!@ step _update_router routes: %s\n".cstring(), _routes.size().string().cstring()) + be update_omni_router(omni_router: OmniRouter) => let old_router = _omni_router _omni_router = omni_router @@ -235,6 +264,7 @@ actor Step is (Producer & Consumer) _routes(boundary) = new_route end end + // @printf[I32]("!@ step add_boundaries routes: %s\n".cstring(), _routes.size().string().cstring()) be remove_boundary(worker: String) => if _outgoing_boundaries.contains(worker) then @@ -252,6 +282,8 @@ actor Step is (Producer & Consumer) None end + // @printf[I32]("!@ step remove_boundary routes: %s\n".cstring(), _routes.size().string().cstring()) + be remove_route_for(step: Consumer) => try _routes.remove(step)? @@ -260,6 +292,8 @@ actor Step is (Producer & Consumer) "to remove\n").cstring()) end + // @printf[I32]("!@ step remove_route_for routes: %s\n".cstring(), _routes.size().string().cstring()) + be run[D: Any val](metric_name: String, pipeline_time_spent: U64, data: D, i_producer: Producer, msg_uid: MsgId, frac_ids: FractionalMessageId, i_seq_id: SeqId, i_route_id: RouteId, @@ -422,6 +456,25 @@ actor Step is (Producer & Consumer) // end _upstreams.unset(producer) + //!@ + be report_status(code: ReportStatusCode) => + match code + | FinishedAcksStatus => + _finished_ack_waiter.report_status(code) + //!@ + | BoundaryCountStatus => + var b_count: USize = 0 + for r in _routes.values() do + match r + | let br: BoundaryRoute => b_count = b_count + 1 + end + end + @printf[I32]("!@ Step %s has %s boundaries.\n".cstring(), _id.string().cstring(), b_count.string().cstring()) + end + for r in _routes.values() do + r.report_status(code) + end + be request_finished_ack(upstream_request_id: RequestId, requester_id: StepId, requester: FinishedAckRequester) => @@ -429,7 +482,7 @@ actor Step is (Producer & Consumer) | let nmp: NormalStepMessageProcessor => _step_message_processor = QueueingStepMessageProcessor(this) end - @printf[I32]("!@ request_finished_ack STEP %s, upstream_request_id: %s, requester_id: %s\n".cstring(), _id.string().cstring(), upstream_request_id.string().cstring(), requester_id.string().cstring()) + // @printf[I32]("!@ request_finished_ack STEP %s, upstream_request_id: %s, requester_id: %s\n".cstring(), _id.string().cstring(), upstream_request_id.string().cstring(), requester_id.string().cstring()) if not _finished_ack_waiter.already_added_request(requester_id) then _finished_ack_waiter.add_new_request(requester_id, upstream_request_id, requester) @@ -466,6 +519,7 @@ actor Step is (Producer & Consumer) _finished_ack_waiter.try_finish_request_early(requester_id) be receive_finished_ack(request_id: RequestId) => + // @printf[I32]("!@ receive_finished_ack STEP %s\n".cstring(), _id.string().cstring()) _finished_ack_waiter.unmark_consumer_request(request_id) be mute(c: Consumer) => diff --git a/lib/wallaroo/ent/data_receiver/data_receiver.pony b/lib/wallaroo/ent/data_receiver/data_receiver.pony index 60b507c267..fb690f4bf5 100644 --- a/lib/wallaroo/ent/data_receiver/data_receiver.pony +++ b/lib/wallaroo/ent/data_receiver/data_receiver.pony @@ -142,9 +142,17 @@ actor DataReceiver is Producer """This is not a real Producer, so it doesn't write any State""" None + //!@ + be report_status(code: ReportStatusCode) => + match code + | FinishedAcksStatus => + _finished_ack_waiter.report_status(code) + end + _router.report_status(code) + be request_finished_ack(upstream_request_id: RequestId, requester_id: StepId) => - @printf[I32]("!@ request_finished_ack DATA RECEIVER upstream_request_id: %s, requester_id: %s\n".cstring(), upstream_request_id.string().cstring(), requester_id.string().cstring()) + // @printf[I32]("!@ request_finished_ack DATA RECEIVER upstream_request_id: %s, requester_id: %s\n".cstring(), upstream_request_id.string().cstring(), requester_id.string().cstring()) if not _finished_ack_waiter.already_added_request(requester_id) then _finished_ack_waiter.add_new_request(requester_id, upstream_request_id, EmptyFinishedAckRequester, _WriteFinishedAck(this, diff --git a/lib/wallaroo/ent/network/control_channel_tcp.pony b/lib/wallaroo/ent/network/control_channel_tcp.pony index 6dadb3660b..60ccf1f56d 100644 --- a/lib/wallaroo/ent/network/control_channel_tcp.pony +++ b/lib/wallaroo/ent/network/control_channel_tcp.pony @@ -359,9 +359,11 @@ class ControlChannelConnectNotifier is TCPConnectionNotify _router_registry.remote_request_finished_ack(m.sender, m.request_id, m.requester_id) | let m: RequestFinishedAckCompleteMsg => + @printf[I32]("!@ RequestFinishedAckCompleteMsg COMPLETE from %s\n".cstring(), m.sender.cstring()) _router_registry.remote_request_finished_ack_complete(m.sender, m.requester_id) | let m: FinishedAckMsg => + @printf[I32]("!@ FINISHEDACKMSG\n".cstring()) ifdef "trace" then @printf[I32]("Received FinishedAckMsg from %s\n".cstring(), m.sender.cstring()) diff --git a/lib/wallaroo/ent/network/external_channel_tcp.pony b/lib/wallaroo/ent/network/external_channel_tcp.pony index 08bf0364e3..acda809487 100644 --- a/lib/wallaroo/ent/network/external_channel_tcp.pony +++ b/lib/wallaroo/ent/network/external_channel_tcp.pony @@ -165,6 +165,22 @@ class ExternalChannelConnectNotifier is TCPConnectionNotify " External Channel\n").cstring()) end _local_topology_initializer.cluster_status_query(conn) + | let m: ExternalSourceIdsQueryMsg => + ifdef "trace" then + @printf[I32](("Received ExternalSourceIdsQueryMsg on " + + " External Channel\n").cstring()) + end + _local_topology_initializer.source_ids_query(conn) + //!@ + | let m: ExternalReportStatusMsg => + //!@ + @printf[I32]("!@ Recvd ExternalReportStatusMsg\n".cstring()) + try + let code = ReportStatusCodeParser(m.code)? + _local_topology_initializer.report_status(code) + else + @printf[I32]("Failed to parse ReportStatusCode\n".cstring()) + end else @printf[I32](("Incoming External Message type not handled by " + "external channel.\n").cstring()) diff --git a/lib/wallaroo/ent/router_registry/router_registry.pony b/lib/wallaroo/ent/router_registry/router_registry.pony index 38d9eff00d..6cef6c6c8b 100644 --- a/lib/wallaroo/ent/router_registry/router_registry.pony +++ b/lib/wallaroo/ent/router_registry/router_registry.pony @@ -20,6 +20,7 @@ use "wallaroo/core/data_channel" use "wallaroo/core/initialization" use "wallaroo/core/invariant" use "wallaroo/core/messages" +use "wallaroo/core/metrics" use "wallaroo/core/routing" use "wallaroo/core/source" use "wallaroo/core/topology" @@ -47,6 +48,8 @@ actor RouterRegistry is FinishedAckRequester let _stateless_partition_routers: Map[U128, StatelessPartitionRouter] = _stateless_partition_routers.create() + var _local_topology_initializer: (LocalTopologyInitializer | None) = None + var _omni_router: (OmniRouter | None) = None var _application_ready_to_work: Bool = false @@ -72,6 +75,8 @@ actor RouterRegistry is FinishedAckRequester let _sources: SetIs[Source] = _sources.create() let _source_listeners: SetIs[SourceListener] = _source_listeners.create() + // Map from Source digestof value to source id + let _source_ids: Map[U64, StepId] = _source_ids.create() let _data_channel_listeners: SetIs[DataChannelListener] = _data_channel_listeners.create() let _control_channel_listeners: SetIs[TCPListener] = @@ -161,14 +166,18 @@ actor RouterRegistry is FinishedAckRequester be set_event_log(e: EventLog) => _event_log = e + be register_local_topology_initializer(lt: LocalTopologyInitializer) => + _local_topology_initializer = lt + // TODO: We need a new approach to registering all disposable actors. // This is a stopgap to register boundaries generated by a Source. // See issue #1411. be register_disposable(d: DisposableActor) => _connections.register_disposable(d) - be register_source(source: Source) => + be register_source(source: Source, source_id: StepId) => _sources.set(source) + _source_ids(digestof source) = source_id if not _stop_the_world_in_process and _application_ready_to_work then source.unmute(_dummy_consumer) end @@ -273,8 +282,11 @@ actor RouterRegistry is FinishedAckRequester for (worker, boundary) in bs.pairs() do if not _outgoing_boundaries.contains(worker) then _outgoing_boundaries(worker) = boundary + //!@ new_boundaries(worker) = boundary end + //!@ + // new_boundaries(worker) = boundary end let new_boundaries_sendable: Map[String, OutgoingBoundary] val = consume new_boundaries @@ -386,7 +398,14 @@ actor RouterRegistry is FinishedAckRequester end _distribute_omni_router() - fun _distribute_boundary_removal(worker: String) => + fun ref _distribute_boundary_removal(worker: String) => + match _omni_router + | let omnr: OmniRouter => + _omni_router = omnr.remove_boundary(worker) + end + + _distribute_omni_router() + for subs in _partition_router_subs.values() do for sub in subs.values() do match sub @@ -414,6 +433,13 @@ actor RouterRegistry is FinishedAckRequester source_listener.remove_boundary(worker) end + match _local_topology_initializer + | let lt: LocalTopologyInitializer => + lt.remove_boundary(worker) + else + Fail() + end + fun _distribute_boundary_builders() => let boundary_builders = recover trn Map[String, OutgoingBoundaryBuilder] end @@ -567,6 +593,15 @@ actor RouterRegistry is FinishedAckRequester worker_names.size(), worker_names, _stop_the_world_in_process) conn.writev(msg) + be source_ids_query(conn: TCPConnection) => + let ids = recover iso Array[String] end + for s_id in _source_ids.values() do + ids.push(s_id.string()) + end + let msg = ExternalMsgEncoder.source_ids_query_response( + consume ids) + conn.writev(msg) + ////////////// // LOG ROTATION ////////////// @@ -613,6 +648,7 @@ actor RouterRegistry is FinishedAckRequester Called when rotation has completed and we should resume processing """ _connections.request_cluster_unmute() + @printf[I32]("!@ rotation_complete _resume_the_world\n".cstring()) _resume_the_world() _unmute_request(_worker_name) @@ -701,6 +737,7 @@ actor RouterRegistry is FinishedAckRequester """ Migration is complete and we're ready to resume message processing """ + @printf[I32]("!@ _resume_the_world COMPLETE\n".cstring()) _finished_ack_waiter.clear() for source in _sources.values() do source.request_finished_ack_complete(_id, this) @@ -814,41 +851,70 @@ actor RouterRegistry is FinishedAckRequester _unmute_request(originating_worker) fun ref _unmute_request(originating_worker: String) => - _stopped_worker_waiting_list.unset(originating_worker) - if (_stopped_worker_waiting_list.size() == 0) then - if (_migration_target_ack_list.size() == 0) and - (_leaving_workers.size() == 0) - then - _resume_the_world() - else - // We should only unmute ourselves once _migration_target_ack_list is - // empty for grow and _leaving_workers is empty for shrink - Fail() + if _stopped_worker_waiting_list.size() > 0 then + _stopped_worker_waiting_list.unset(originating_worker) + if (_stopped_worker_waiting_list.size() == 0) then + if (_migration_target_ack_list.size() == 0) and + (_leaving_workers.size() == 0) + then + @printf[I32]("!@ _unmute_request _resume_the_world\n".cstring()) + _resume_the_world() + else + // We should only unmute ourselves once _migration_target_ack_list is + // empty for grow and _leaving_workers is empty for shrink + Fail() + end end end + //!@ + be report_status(code: ReportStatusCode) => + match code + | FinishedAcksStatus => + @printf[I32]("!@ RouterRegistry finished_ack_status\n".cstring()) + | BoundaryCountStatus => + @printf[I32]("RouterRegistry knows about %s boundaries\n" + .cstring(), _outgoing_boundaries.size().string().cstring()) + end + for source in _sources.values() do + source.report_status(code) + end + be remote_request_finished_ack(originating_worker: String, upstream_request_id: RequestId, upstream_requester_id: StepId) => @printf[I32]("!@ remote_request_finished_ack REGISTRY %s\n".cstring(), _id.string().cstring()) - _finished_ack_waiter.add_new_request(upstream_requester_id, - upstream_request_id where custom_action = AckFinishedAction(_auth, - _worker_name, originating_worker, upstream_request_id, _connections)) - - if _sources.size() > 0 then - for source in _sources.values() do - // @printf[I32]("!@ -- Stopping world for source %s\n".cstring(), (digestof source).string().cstring()) - let request_id = - _finished_ack_waiter.add_consumer_request(upstream_requester_id) - source.request_finished_ack(request_id, _id, this) + if not _finished_ack_waiter.already_added_request(upstream_requester_id) + then + _finished_ack_waiter.add_new_request(upstream_requester_id, + upstream_request_id where custom_action = AckFinishedAction(_auth, + _worker_name, originating_worker, upstream_request_id, _connections)) + + if _sources.size() > 0 then + for source in _sources.values() do + // @printf[I32]("!@ -- Stopping world for source %s\n".cstring(), (digestof source).string().cstring()) + let request_id = + _finished_ack_waiter.add_consumer_request(upstream_requester_id) + source.request_finished_ack(request_id, _id, this) + end + else + _finished_ack_waiter.try_finish_request_early(upstream_requester_id) end else - _finished_ack_waiter.try_finish_request_early(upstream_requester_id) + try + let finished_ack_msg = + ChannelMsgEncoder.finished_ack(_worker_name, upstream_request_id, + _auth)? + _connections.send_control(originating_worker, finished_ack_msg) + else + Fail() + end end be remote_request_finished_ack_complete(originating_worker: String, upstream_requester_id: StepId) => + @printf[I32]("!@ remote_request_finished_ack_complete from %s !!-!-!!\n".cstring(), originating_worker.cstring()) for source in _sources.values() do source.request_finished_ack_complete(_id, this) end @@ -1208,7 +1274,28 @@ actor RouterRegistry is FinishedAckRequester ///// // Step moved onto this worker ///// - be move_proxy_to_stateful_step[K: (Hashable val & Equatable[K] val)]( + be receive_immigrant_step(subpartition: StateSubpartition, + runner_builder: RunnerBuilder, reporter: MetricsReporter iso, + recovery_replayer: RecoveryReplayer, msg: StepMigrationMsg) + => + let outgoing_boundaries = recover iso Map[String, OutgoingBoundary] end + for (k, v) in _outgoing_boundaries.pairs() do + outgoing_boundaries(k) = v + end + + match _event_log + | let event_log: EventLog => + let step = Step(_auth, runner_builder(where event_log = event_log, + auth = _auth), consume reporter, msg.step_id(), + runner_builder.route_builder(), event_log, recovery_replayer, + consume outgoing_boundaries) + step.receive_state(msg.state()) + msg.update_router_registry(this, step) + else + Fail() + end + + fun ref move_proxy_to_stateful_step[K: (Hashable val & Equatable[K] val)]( id: U128, target: Consumer, key: K, state_name: String, source_worker: String) => @@ -1219,9 +1306,9 @@ actor RouterRegistry is FinishedAckRequester try match target | let step: Step => + _register_omni_router_step(step) _data_router = _data_router.add_route(id, step) _distribute_data_router() - _register_omni_router_step(step) _distribute_omni_router() let partition_router = _partition_routers(state_name)?.update_route[K](key, step)? diff --git a/lib/wallaroo_labs/messages/external_messages.pony b/lib/wallaroo_labs/messages/external_messages.pony index b99ef341f4..c68f1c7ad7 100644 --- a/lib/wallaroo_labs/messages/external_messages.pony +++ b/lib/wallaroo_labs/messages/external_messages.pony @@ -20,6 +20,7 @@ use "buffered" use "collections" use "net" use "../query" +use "../../wallaroo/core/common" use "../../wallaroo/core/topology" primitive _Print fun apply(): U16 => 1 @@ -34,6 +35,10 @@ primitive _ClusterStatusQuery fun apply(): U16 => 9 primitive _ClusterStatusQueryResponse fun apply(): U16 => 10 primitive _PartitionCountQuery fun apply(): U16 => 11 primitive _PartitionCountQueryResponse fun apply(): U16 => 12 +primitive _SourceIdsQuery fun apply(): U16 => 13 +primitive _SourceIdsQueryResponse fun apply(): U16 => 14 +//!@ +primitive _ReportStatus fun apply(): U16 => 9999 primitive ExternalMsgEncoder fun _encode(id: U16, s: String, wb: Writer): Array[ByteSeq] val => @@ -124,6 +129,18 @@ primitive ExternalMsgEncoder let pqr = PartitionQueryEncoder.state_and_stateless_by_count(digest_map) _encode(_PartitionCountQueryResponse(), pqr, wb) + fun source_ids_query(wb: Writer = Writer): Array[ByteSeq] val => + """ + A message requesting the ids of all sources in the cluster + """ + _encode(_SourceIdsQuery(), "", wb) + + fun source_ids_query_response(source_ids: Array[String] val, + wb: Writer = Writer): Array[ByteSeq] val + => + let sis = SourceIdsQueryEncoder.response(source_ids) + _encode(_SourceIdsQueryResponse(), sis, wb) + fun _partition_digest(state_routers: Map[String, PartitionRouter], stateless_routers: Map[U128, StatelessPartitionRouter]): Map[String, Map[String, Map[String, Array[String] val] val] val] @@ -146,6 +163,10 @@ primitive ExternalMsgEncoder digest_map("stateless_partitions") = consume stateless_ps digest_map + //!@ + fun report_status(code: String, wb: Writer = Writer): Array[ByteSeq] val => + _encode(_ReportStatus(), code, wb) + primitive ExternalMsgDecoder fun apply(data: Array[U8] val): ExternalMsg val ? => """ @@ -183,6 +204,13 @@ primitive ExternalMsgDecoder ExternalPartitionCountQueryMsg | (_PartitionCountQueryResponse(), let s: String) => ExternalPartitionCountQueryResponseMsg(s) + | (_SourceIdsQuery(), let s: String) => + ExternalSourceIdsQueryMsg + | (_SourceIdsQueryResponse(), let s: String) => + SourceIdsQueryJsonDecoder.response(s) + //!@ + | (_ReportStatus(), let s: String) => + ExternalReportStatusMsg(s) else error end @@ -354,3 +382,19 @@ class val ExternalPartitionCountQueryResponseMsg is ExternalMsg new val create(m: String) => msg = m + +primitive ExternalSourceIdsQueryMsg is ExternalMsg + +class val ExternalSourceIdsQueryResponseMsg is ExternalMsg + let source_ids: Array[String] val + + new val create(m: Array[String] val) => + source_ids = m + +//!@ +class val ExternalReportStatusMsg is ExternalMsg + let code: String + + new val create(c: String) => + code = c + diff --git a/lib/wallaroo_labs/query/query_json.pony b/lib/wallaroo_labs/query/query_json.pony index 5581977293..639baca6f4 100644 --- a/lib/wallaroo_labs/query/query_json.pony +++ b/lib/wallaroo_labs/query/query_json.pony @@ -20,6 +20,7 @@ use "collections" use "itertools" use "../messages" use "../mort" +use "../../wallaroo/core/common" type _JsonDelimiters is (_JsonString | _JsonArray | _JsonMap) @@ -150,6 +151,10 @@ primitive ClusterStatusQueryJsonEncoder entries.push(_Quoted("worker_count") + ":" + worker_count.string()) _JsonEncoder(consume entries, _JsonMap) +primitive SourceIdsQueryEncoder + fun response(source_ids: Array[String] val): String => + _JsonEncoder(source_ids, _JsonArray) + primitive JsonDecoder fun string_array(s: String): Array[String] val => let items = recover iso Array[String] end @@ -394,4 +399,13 @@ primitive ClusterStatusQueryJsonDecoder let worker_count: U64 = p_map("worker_count")?.u64()? ExternalClusterStatusQueryResponseMsg(worker_count, worker_names, +<<<<<<< HEAD is_processing, json) +======= + is_processing) + +primitive SourceIdsQueryJsonDecoder + fun response(json: String): ExternalSourceIdsQueryResponseMsg => + let source_ids = JsonDecoder.string_array(json) + ExternalSourceIdsQueryResponseMsg(source_ids) +>>>>>>> 34add402... .. diff --git a/testing/correctness/apps/alphabet/alphabet.pony b/testing/correctness/apps/alphabet/alphabet.pony index 5db5d140cb..ab2cadf8d1 100644 --- a/testing/correctness/apps/alphabet/alphabet.pony +++ b/testing/correctness/apps/alphabet/alphabet.pony @@ -32,12 +32,19 @@ use "wallaroo/core/topology" actor Main new create(env: Env) => try + //!@ + var count: USize = 0 let parts: Array[String] val = recover let s = "abcdefghijklmnopqrstuvwxyz" let a = Array[String] for b in s.values() do for c in s.values() do - a.push(String.from_array([b ; c])) + //!@ + if count < 10000 then + a.push(String.from_array([b ; c])) + //!@ + count = count + 1 + end end end a.push("!!") diff --git a/testing/tools/external_sender/external_sender.pony b/testing/tools/external_sender/external_sender.pony index 02c5140d4c..7bfc374474 100644 --- a/testing/tools/external_sender/external_sender.pony +++ b/testing/tools/external_sender/external_sender.pony @@ -59,7 +59,8 @@ actor Main --external/-e [Specifies address to send message to] --type/-t [Specifies message type] clean-shutdown | rotate-log | partition-query | - partition-count-query | cluster-status-query | print + partition-count-query | cluster-status-query | + source-ids-query | print --message/-m [Specifies message contents to send] rotate-log Node name to rotate log files @@ -87,6 +88,14 @@ actor Main | "cluster-status-query" => await_response = true ExternalMsgEncoder.cluster_status_query() + | "source-ids-query" => + await_response = true + ExternalMsgEncoder.source_ids_query() + //!@ + | "finished-ack-status" => + ExternalMsgEncoder.report_status("finished-acks-status") + | "boundary-count-status" => + ExternalMsgEncoder.report_status("boundary-count-status") else // default to print ExternalMsgEncoder.print_message(message) end @@ -161,6 +170,12 @@ class ExternalSenderConnectNotifier is TCPConnectionNotify end _env.out.print(m.msg) conn.dispose() + | let m: ExternalSourceIdsQueryResponseMsg => + _env.out.print("Source Ids:") + for s_id in m.source_ids.values() do + _env.out.print(". " + s_id.string()) + end + conn.dispose() else _env.err.print("Received unhandled external message type") _env.exitcode(1)