Skip to content

Commit 0daa87b

Browse files
authored
src/sync: Add Client::publish_success (#5)
Enables test instances to signal success both via a publish to the sync service and to Testground daemon via stdout.
1 parent d583657 commit 0daa87b

File tree

3 files changed

+119
-9
lines changed

3 files changed

+119
-9
lines changed

CHANGELOG.md

+5-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@ All notable changes to this project will be documented in this file.
44
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66

7-
## [Unreleased]
7+
## [0.1.1]
8+
### Added
9+
- Add `Client::publish_success` to signal instance success to daemon and sync service. See [PR 5].
10+
11+
[PR 5]: https://github.com/testground/sdk-rust/pull/5
812

913
## [0.1.0] - 2022-01-24
1014
### Added

Cargo.toml

+3-2
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@ edition = "2021"
66
license = "Apache-2.0 OR MIT"
77
name = "testground"
88
repository = "https://github.com/testground/sdk-rust/"
9-
version = "0.1.0"
9+
version = "0.1.1"
1010

1111
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
1212

1313
[dependencies]
14-
soketto = "0.7.1"
1514
async-std = { version = "1.10", features = [ "attributes" ] }
15+
log = "0.4"
1616
serde = { version = "1", features = [ "derive" ] }
1717
serde_json = "1"
18+
soketto = "0.7.1"
1819
thiserror = "1"

src/sync.rs

+111-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use serde::{Deserialize, Serialize};
22
use soketto::connection::{Receiver, Sender};
33
use soketto::handshake::{self, ServerResponse};
4+
use std::time::{SystemTime, UNIX_EPOCH};
45
use thiserror::Error;
56

67
/// Basic synchronization client enabling one to send signals and await barriers.
@@ -57,6 +58,7 @@ impl Client {
5758
state: contextualized_state,
5859
}),
5960
barrier: None,
61+
publish: None,
6062
};
6163

6264
self.send(request).await?;
@@ -83,6 +85,7 @@ impl Client {
8385
let request = Request {
8486
id: id.clone(),
8587
signal_entry: None,
88+
publish: None,
8689
barrier: Some(BarrierRequest {
8790
state: contextualized_state,
8891
target,
@@ -100,13 +103,62 @@ impl Client {
100103
Ok(())
101104
}
102105

106+
pub async fn publish_success(&mut self) -> Result<u64, PublishSuccessError> {
107+
let id = self.next_id().to_string();
108+
109+
let event = Event {
110+
success_event: SuccessEvent {
111+
group: std::env::var("TEST_GROUP_ID").unwrap(),
112+
},
113+
};
114+
115+
let request = Request {
116+
id: id.clone(),
117+
signal_entry: None,
118+
barrier: None,
119+
publish: Some(PublishRequest {
120+
topic: topic(),
121+
payload: event.clone(),
122+
}),
123+
};
124+
125+
self.send(request).await?;
126+
let resp = self.receive().await?;
127+
if resp.id != id {
128+
return Err(PublishSuccessError::UnexpectedId(resp.id));
129+
}
130+
if !resp.error.is_empty() {
131+
return Err(PublishSuccessError::Remote(resp.error));
132+
}
133+
let seq = resp
134+
.publish
135+
.ok_or(PublishSuccessError::ExpectedPublishInResponse)
136+
.map(|resp| resp.seq)?;
137+
138+
// The Testground daemon determines the success or failure of a test
139+
// instance by parsing stdout for runtime events.
140+
println!(
141+
"{}",
142+
serde_json::to_string(&LogLine {
143+
ts: SystemTime::now()
144+
.duration_since(UNIX_EPOCH)
145+
.unwrap()
146+
.as_nanos(),
147+
event,
148+
})?
149+
);
150+
151+
Ok(seq)
152+
}
153+
103154
fn next_id(&mut self) -> u64 {
104155
let next_id = self.next_id;
105156
self.next_id += 1;
106157
next_id
107158
}
108159

109160
async fn send(&mut self, req: Request) -> Result<(), SendError> {
161+
log::debug!("Sending request: {:?}", req);
110162
self.sender.send_text(serde_json::to_string(&req)?).await?;
111163
self.sender.flush().await?;
112164
Ok(())
@@ -116,20 +168,28 @@ impl Client {
116168
let mut data = Vec::new();
117169
self.receiver.receive_data(&mut data).await?;
118170
let resp = serde_json::from_str(&String::from_utf8(data)?)?;
171+
log::debug!("Received response: {:?}", resp);
119172
Ok(resp)
120173
}
121174
}
122175

123-
fn contextualize_state(state: String) -> String {
176+
fn context_from_env() -> String {
124177
format!(
125-
"run:{}:plan:{}:case:{}:states:{}",
178+
"run:{}:plan:{}:case:{}",
126179
std::env::var("TEST_RUN").unwrap(),
127180
std::env::var("TEST_PLAN").unwrap(),
128181
std::env::var("TEST_CASE").unwrap(),
129-
state
130182
)
131183
}
132184

185+
fn contextualize_state(state: String) -> String {
186+
format!("{}:states:{}", context_from_env(), state,)
187+
}
188+
189+
fn topic() -> String {
190+
format!("{}:run_events", context_from_env(),)
191+
}
192+
133193
#[derive(Error, Debug)]
134194
pub enum SignalError {
135195
#[error("Remote returned error {0}.")]
@@ -156,6 +216,22 @@ pub enum BarrierError {
156216
Receive(#[from] ReceiveError),
157217
}
158218

219+
#[derive(Error, Debug)]
220+
pub enum PublishSuccessError {
221+
#[error("Serde: {0}")]
222+
Serde(#[from] serde_json::error::Error),
223+
#[error("Remote returned error {0}.")]
224+
Remote(String),
225+
#[error("Remote returned response with unexpected ID {0}.")]
226+
UnexpectedId(String),
227+
#[error("Expected remote to return publish entry in response.")]
228+
ExpectedPublishInResponse,
229+
#[error("Error sending request {0}")]
230+
Send(#[from] SendError),
231+
#[error("Error receiving response: {0}")]
232+
Receive(#[from] ReceiveError),
233+
}
234+
159235
#[derive(Error, Debug)]
160236
pub enum SendError {
161237
#[error("Soketto: {0}")]
@@ -174,32 +250,61 @@ pub enum ReceiveError {
174250
FromUtf8(#[from] std::string::FromUtf8Error),
175251
}
176252

177-
#[derive(Serialize)]
253+
#[derive(Debug, Serialize)]
178254
struct Request {
179255
id: String,
180256
signal_entry: Option<SignalEntryRequest>,
181257
barrier: Option<BarrierRequest>,
258+
publish: Option<PublishRequest>,
182259
}
183260

184-
#[derive(Serialize)]
261+
#[derive(Debug, Serialize)]
185262
struct SignalEntryRequest {
186263
state: String,
187264
}
188265

189-
#[derive(Serialize)]
266+
#[derive(Debug, Serialize)]
190267
struct BarrierRequest {
191268
state: String,
192269
target: u64,
193270
}
194271

272+
#[derive(Debug, Serialize)]
273+
struct PublishRequest {
274+
topic: String,
275+
payload: Event,
276+
}
277+
278+
#[derive(Debug, Serialize)]
279+
struct LogLine {
280+
ts: u128,
281+
event: Event,
282+
}
283+
284+
#[derive(Clone, Debug, Serialize)]
285+
struct Event {
286+
success_event: SuccessEvent,
287+
}
288+
289+
#[derive(Clone, Debug, Serialize)]
290+
struct SuccessEvent {
291+
group: String,
292+
}
293+
195294
#[derive(Deserialize, Debug)]
196295
struct Response {
197296
id: String,
198297
signal_entry: Option<SignalEntryResponse>,
199298
error: String,
299+
publish: Option<PublishResponse>,
200300
}
201301

202302
#[derive(Deserialize, Debug)]
203303
struct SignalEntryResponse {
204304
seq: u64,
205305
}
306+
307+
#[derive(Deserialize, Debug)]
308+
struct PublishResponse {
309+
seq: u64,
310+
}

0 commit comments

Comments
 (0)