-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmain.rs
180 lines (161 loc) · 5.62 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
mod engine;
use crate::engine::config::{load_config, DataSource};
use crate::engine::ratelimit::RateLimit;
use crate::engine::validation;
use crate::engine::validation::{
validate_event, BlockedType, JsonDataSource, ValidationDataSource,
};
use nostr_sdk::Event;
use serde::{Deserialize, Serialize};
use std::error::Error;
use std::process;
use std::time::Duration;
use tokio::io::{stdin, stdout, AsyncWriteExt};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio_postgres::{Error as PGError, NoTls};
/// Represents a request from the relay
#[derive(Deserialize)]
struct Request {
#[serde(rename = "type")]
type_field: String,
event: Event,
#[serde(rename = "receivedAt")]
_received_at: u64,
#[serde(rename = "sourceType")]
_source_type: String,
#[serde(rename = "sourceInfo")]
source_info: String,
}
/// Represents the response we provide back to the relay
#[derive(Serialize)]
struct Response {
id: String,
action: String,
#[serde(skip_serializing_if = "Option::is_none")]
msg: Option<String>,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Load config
let config = match load_config("/etc/chief/config.toml") {
Ok(cfg) => cfg,
Err(e) => {
eprintln!("Failed to load config: {:?}", e);
process::exit(1);
}
};
// Select either JSON file or DB datasource, configured in the config.toml file
let data_source = if config.datasource_mode == DataSource::Db {
// Set up a database as the datasource
let (client, connection) = tokio_postgres::connect(
format!(
"host={} port={} user={} password={} dbname={}",
config.database.host,
config.database.port,
config.database.user,
config.database.password,
config.database.dbname
)
.as_str(),
NoTls,
)
.await?;
// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
Box::new(client) as Box<dyn validation::ValidationDataSource>
} else {
// Set up a JSON file as the datasource
let json_data_source = JsonDataSource::new_from_file(config.json.file_path.as_str())?;
Box::new(json_data_source) as Box<dyn ValidationDataSource>
};
let rate_limit_engine = RateLimit::new(
config.filters.rate_limit.max_events,
Duration::from_secs(config.filters.rate_limit.time_window as u64),
);
// Set up stdin and stdout handles
let mut reader = BufReader::new(stdin()).lines();
let mut writer = stdout();
while let Some(line) = reader.next_line().await.unwrap() {
// Deserialize request from strfry
let req: Request = serde_json::from_str(&line)?;
// Type is currently always "new", anything else is an error as per Strfry documentation
if req.type_field != "new" {
eprintln!("unexpected request type {}", req.type_field);
continue;
}
// Build default response
let mut res = Response {
id: req.event.id.to_hex(),
action: String::from("reject"),
msg: Some(String::from("blocked")),
};
// Validates if the event should be persisted or not against a set of filters and modifies the response thereafter
match validate_event(
&*data_source,
&req.event,
&config.filters,
&rate_limit_engine,
)
.await
{
Ok(Some(BlockedType::RateLimit)) => {
res.msg = Some(String::from("rate limited"));
print_blocked_message(&req, "rate-limited");
}
Ok(Some(BlockedType::Pubkey)) => {
res.msg = Some(String::from(
"public key does not have permission to write to relay",
));
print_blocked_message(&req, "not allowed to write");
}
Ok(Some(BlockedType::Kind)) => {
res.msg = Some(String::from("event kind blocked by relay"));
print_blocked_message(
&req,
format!("kind not accepted ({})", req.event.kind.as_u64()).as_str(),
);
}
Ok(Some(BlockedType::Word)) => {
res.msg = Some(String::from("blocked content"));
print_blocked_message(&req, "blocked content");
}
Ok(None) => {
res.action = String::from("accept");
res.msg = None;
}
Err(err) => {
eprintln!("error validating event: {}", err)
}
}
// Output result of event validation, this is picked up by strfry for further processing
writer
.write_all(serde_json::to_string(&res)?.as_bytes())
.await
.unwrap();
writer.write_all(b"\n").await.unwrap();
}
Ok(())
}
fn print_blocked_message(req: &Request, reason: &str) {
println!(
"[BLOCKED] public key {} from {}: {}",
req.event.pubkey, req.source_info, reason
);
}
// Handling the error in case the database query fails
struct MyPGError(PGError);
impl From<PGError> for MyPGError {
fn from(error: PGError) -> Self {
MyPGError(error)
}
}
impl From<MyPGError> for Box<dyn Error> {
fn from(error: MyPGError) -> Box<dyn Error> {
Box::new(error.0)
}
}