Skip to content

Commit 7c39d06

Browse files
committed
add basic discrete event simulator
1 parent 69f8681 commit 7c39d06

File tree

3 files changed

+332
-0
lines changed

3 files changed

+332
-0
lines changed

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ __pycache__
44
# Test build files
55
/test/autogen
66
/test/build
7+
/sim/autogen
8+
/sim/build
79

810
# Tup database
911
/.tup

sim/Tupfile.lua

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
2+
fibre_cpp_dir = '../cpp'
3+
tup.include(fibre_cpp_dir..'/package.lua')
4+
5+
CXX='clang++'
6+
LINKER='clang++'
7+
CFLAGS={'-g', '-I.'}
8+
LDFLAGS={}
9+
object_files = {}
10+
11+
12+
fibre_pkg = get_fibre_package({
13+
enable_server=true,
14+
enable_client=true,
15+
enable_event_loop=false,
16+
allow_heap=true,
17+
enable_libusb_backend=false,
18+
enable_tcp_client_backend=false,
19+
enable_tcp_server_backend=false,
20+
})
21+
22+
CFLAGS += fibre_pkg.cflags
23+
LDFLAGS += fibre_pkg.ldflags
24+
25+
for _, inc in pairs(fibre_pkg.include_dirs) do
26+
CFLAGS += '-I'..fibre_cpp_dir..'/'..inc
27+
end
28+
29+
for _, src in pairs(fibre_pkg.code_files) do
30+
object_files += compile(fibre_cpp_dir..'/'..src)
31+
end
32+
33+
34+
autogen_pkg = fibre_autogen('../test/test-interface.yaml')
35+
36+
object_files += compile('sim_main.cpp')
37+
object_files += compile('../test/test_node.cpp', autogen_pkg.autogen_headers)
38+
39+
-- TODO: move up
40+
for _, src in pairs(autogen_pkg.code_files) do
41+
object_files += compile(src, autogen_pkg.autogen_headers)
42+
end
43+
44+
45+
compile_outname='build/fibre_sim'
46+
47+
tup.frule{
48+
inputs=object_files,
49+
command='^c^ '..LINKER..' %f '..tostring(CFLAGS)..' '..tostring(LDFLAGS)..' -o %o',
50+
outputs={compile_outname}
51+
}

sim/sim_main.cpp

+279
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
2+
#include "../test/test_node.hpp"
3+
#include <fibre/fibre.hpp>
4+
#include <fibre/logging.hpp>
5+
#include <unordered_map>
6+
#include <algorithm>
7+
#include <variant>
8+
9+
namespace fibre {
10+
11+
namespace simulator {
12+
13+
struct Port;
14+
15+
struct Node {
16+
std::string name;
17+
std::unordered_map<std::string, Port*> ports;
18+
};
19+
20+
struct Port {
21+
Node* node;
22+
std::string name;
23+
};
24+
25+
class Simulator final : public EventLoop {
26+
public:
27+
void send(Port* from, Port* to, float duration, Callback<void> on_delivery);
28+
29+
void run();
30+
31+
RichStatus post(Callback<void> callback) final;
32+
RichStatus register_event(int fd, uint32_t events,
33+
Callback<void, uint32_t> callback) final;
34+
RichStatus deregister_event(int fd) final;
35+
RichStatus call_later(float delay, Callback<void> callback,
36+
EventLoopTimer** p_timer) final;
37+
RichStatus cancel_timer(EventLoopTimer* timer) final;
38+
39+
uint64_t t_ns = 0;
40+
41+
private:
42+
struct Event {
43+
uint64_t t_ns;
44+
Callback<void> trigger;
45+
Port* from; // optional
46+
Port* to; // optional
47+
};
48+
49+
void add_event(Event evt);
50+
51+
std::vector<Event> backlog;
52+
};
53+
54+
} // namespace simulator
55+
56+
struct FibreNode {
57+
FibreNode(simulator::Simulator* simulator, std::string name)
58+
: simulator_{simulator}, sim_node_{name} {}
59+
60+
void start(bool enable_server, bool enable_client);
61+
62+
void log(const char* file, unsigned line, int level, uintptr_t info0,
63+
uintptr_t info1, const char* text);
64+
65+
simulator::Simulator* simulator_;
66+
simulator::Node sim_node_;
67+
TestNode impl_;
68+
};
69+
70+
struct CanBus;
71+
72+
struct CanChannel final : AsyncStreamSource, AsyncStreamSink {
73+
void start_read(bufptr_t buffer, TransferHandle* handle,
74+
Callback<void, ReadResult> completer) final;
75+
void cancel_read(TransferHandle transfer_handle) final;
76+
void start_write(cbufptr_t buffer, TransferHandle* handle,
77+
Callback<void, WriteResult> completer) final;
78+
void cancel_write(TransferHandle transfer_handle) final;
79+
void on_delivery();
80+
81+
CanBus* bus_;
82+
simulator::Port* port_;
83+
Callback<void, ReadResult> rx_completer_;
84+
Callback<void, WriteResult> tx_completer_;
85+
cbufptr_t tx_buf_;
86+
bufptr_t rx_buf_;
87+
};
88+
89+
struct CanBus {
90+
CanBus(simulator::Simulator* simulator) : simulator_{simulator} {}
91+
92+
void send(cbufptr_t buffer, CanChannel* from);
93+
void send_next();
94+
void on_sent();
95+
96+
void join(FibreNode* node, std::string port);
97+
void leave(FibreNode* node, std::string port);
98+
99+
std::vector<CanChannel*> tx_queue_;
100+
std::vector<std::shared_ptr<CanChannel>> connections_;
101+
simulator::Simulator* simulator_;
102+
103+
bool busy = false;
104+
105+
size_t mtu = 64;
106+
uint32_t bps = 1000000;
107+
};
108+
109+
} // namespace fibre
110+
111+
using namespace fibre;
112+
using namespace fibre::simulator;
113+
114+
void Simulator::send(Port* from, Port* to, float duration,
115+
Callback<void> on_delivery) {
116+
uint64_t duration_ns = duration * (float)1e9;
117+
add_event(Event{t_ns + duration_ns, on_delivery, from, to});
118+
}
119+
120+
void Simulator::add_event(Event new_evt) {
121+
auto it = std::find_if(backlog.begin(), backlog.end(), [&](Event& evt) {
122+
return (evt.t_ns - t_ns) > (new_evt.t_ns - t_ns);
123+
});
124+
backlog.insert(it, new_evt);
125+
}
126+
127+
void Simulator::run() {
128+
while (backlog.size()) {
129+
Event evt = backlog.front();
130+
backlog.erase(backlog.begin());
131+
t_ns = evt.t_ns;
132+
evt.trigger.invoke();
133+
}
134+
}
135+
136+
RichStatus Simulator::post(Callback<void> callback) {
137+
return F_MAKE_ERR("not implemented");
138+
}
139+
RichStatus Simulator::register_event(int fd, uint32_t events,
140+
Callback<void, uint32_t> callback) {
141+
return F_MAKE_ERR("not implemented");
142+
}
143+
RichStatus Simulator::deregister_event(int fd) {
144+
return F_MAKE_ERR("not implemented");
145+
}
146+
RichStatus Simulator::call_later(float delay, Callback<void> callback,
147+
EventLoopTimer** p_timer) {
148+
uint64_t delay_ns = delay * (float)1e9;
149+
add_event({t_ns + delay_ns, callback, nullptr, nullptr});
150+
return F_MAKE_ERR("not implemented");
151+
}
152+
RichStatus Simulator::cancel_timer(EventLoopTimer* timer) {
153+
return F_MAKE_ERR("not implemented");
154+
}
155+
156+
void FibreNode::start(bool enable_server, bool enable_client) {
157+
Logger logger{MEMBER_CB(this, log), get_log_verbosity()};
158+
impl_.start(simulator_, "", enable_server, enable_client, logger);
159+
}
160+
161+
void FibreNode::log(const char* file, unsigned line, int level, uintptr_t info0,
162+
uintptr_t info1, const char* text) {
163+
switch ((LogLevel)level) {
164+
case LogLevel::kDebug:
165+
// std::cerr << "\x1b[93;1m"; // yellow
166+
break;
167+
case LogLevel::kError:
168+
std::cerr << "\x1b[91;1m"; // red
169+
break;
170+
default:
171+
break;
172+
}
173+
174+
float sim_time = (float)simulator_->t_ns / 1e6;
175+
std::cerr << "t=" << sim_time << "ms " << sim_node_.name << " [" << file
176+
<< ":" << line << "] " << text << "\x1b[0m" << std::endl;
177+
}
178+
179+
void CanBus::join(FibreNode* node, std::string port_name) {
180+
// node->sim_node_.name = ;
181+
182+
// TODO: check if exists
183+
Port* port = new Port{&node->sim_node_, port_name};
184+
node->sim_node_.ports[port_name] = port;
185+
186+
auto channel = std::make_shared<CanChannel>();
187+
channel->bus_ = this;
188+
channel->port_ = port;
189+
190+
connections_.push_back(channel);
191+
192+
node->impl_.domain_->add_channels(
193+
{fibre::Status::kFibreOk, channel.get(), channel.get(), mtu, true});
194+
}
195+
196+
void CanBus::send_next() {
197+
CanChannel* tx_channel = tx_queue_.front();
198+
busy = true;
199+
200+
float duration = (float)(tx_channel->tx_buf_.size() * 8) / (float)bps;
201+
202+
for (auto& rx_channel : connections_) {
203+
if (rx_channel.get() != tx_channel && rx_channel->rx_completer_) {
204+
size_t n_copy = std::min(mtu, std::min(tx_channel->tx_buf_.size(),
205+
rx_channel->rx_buf_.size()));
206+
std::copy_n(tx_channel->tx_buf_.begin(), n_copy,
207+
rx_channel->rx_buf_.begin());
208+
rx_channel->rx_buf_ = rx_channel->rx_buf_.skip(n_copy);
209+
210+
simulator_->send(tx_channel->port_, rx_channel->port_, duration,
211+
MEMBER_CB(rx_channel.get(), on_delivery));
212+
}
213+
}
214+
215+
simulator_->call_later(duration, MEMBER_CB(this, on_sent), nullptr);
216+
}
217+
218+
void CanBus::on_sent() {
219+
CanChannel* channel = tx_queue_.front();
220+
221+
size_t n_sent = std::min(channel->tx_buf_.size(), mtu);
222+
channel->tx_completer_.invoke_and_clear(
223+
{kStreamOk, channel->tx_buf_.begin() + n_sent});
224+
225+
tx_queue_.erase(tx_queue_.begin());
226+
busy = false;
227+
228+
if (tx_queue_.size()) {
229+
send_next();
230+
}
231+
}
232+
233+
void CanChannel::start_read(bufptr_t buffer, TransferHandle* handle,
234+
Callback<void, ReadResult> completer) {
235+
rx_buf_ = buffer;
236+
rx_completer_ = completer;
237+
}
238+
void CanChannel::cancel_read(TransferHandle transfer_handle) {}
239+
240+
void CanChannel::on_delivery() {
241+
rx_completer_.invoke_and_clear({kStreamOk, rx_buf_.begin()});
242+
}
243+
244+
void CanChannel::start_write(cbufptr_t buffer, TransferHandle* handle,
245+
Callback<void, WriteResult> completer) {
246+
if (!tx_completer_) {
247+
tx_buf_ = buffer;
248+
tx_completer_ = completer;
249+
250+
bus_->tx_queue_.push_back(this);
251+
252+
if (!bus_->busy) {
253+
bus_->send_next();
254+
}
255+
}
256+
}
257+
258+
void CanChannel::cancel_write(TransferHandle transfer_handle) {}
259+
260+
int main() {
261+
printf("Starting Fibre server...\n");
262+
263+
Simulator simulator;
264+
265+
FibreNode server{&simulator, "server"};
266+
FibreNode client{&simulator, "client"};
267+
268+
CanBus busA{&simulator};
269+
270+
server.start(true, false);
271+
client.start(false, true);
272+
273+
busA.join(&server, "can0");
274+
busA.join(&client, "can0");
275+
276+
simulator.run();
277+
278+
printf("No more events in queue. Simulation terminated.\n");
279+
}

0 commit comments

Comments
 (0)