Skip to content

Commit

Permalink
Improve session timeout logic
Browse files Browse the repository at this point in the history
  • Loading branch information
einarmo committed Jul 29, 2024
1 parent d62d257 commit 8d202e4
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 76 deletions.
11 changes: 11 additions & 0 deletions lib/src/server/config/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@ pub struct ServerConfig {
/// we will just instantly time out.
#[serde(default = "defaults::max_secure_channel_token_lifetime_ms")]
pub max_secure_channel_token_lifetime_ms: u32,
/// Maximum time before a session will be timed out. The client will request
/// a number, this just sets the upper limit on that value.
/// Note that there is no lower limit, if a client sets an expiry of 0
/// we will instantly time out.
#[serde(default = "defaults::max_session_timeout_ms")]
pub max_session_timeout_ms: f64,
}

mod defaults {
Expand All @@ -228,6 +234,10 @@ mod defaults {
pub fn max_secure_channel_token_lifetime_ms() -> u32 {
300_000
}

pub fn max_session_timeout_ms() -> f64 {
constants::MAX_SESSION_TIMEOUT
}
}

impl Config for ServerConfig {
Expand Down Expand Up @@ -345,6 +355,7 @@ impl Default for ServerConfig {
publish_timeout_default_ms: defaults::publish_timeout_default_ms(),
max_timeout_ms: defaults::max_timeout_ms(),
max_secure_channel_token_lifetime_ms: defaults::max_secure_channel_token_lifetime_ms(),
max_session_timeout_ms: defaults::max_session_timeout_ms(),
}
}
}
Expand Down
25 changes: 19 additions & 6 deletions lib/src/server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,12 +284,9 @@ impl Server {
Err(e) => error!("Connection panic! {e}")
}
}
_ = Self::run_subscription_ticks(self.config.subscription_poll_interval_ms, &context) => {
unreachable!()
}
_ = Self::run_discovery_server_registration(self.info.clone()) => {
unreachable!()
}
_ = Self::run_subscription_ticks(self.config.subscription_poll_interval_ms, &context) => {}
_ = Self::run_discovery_server_registration(self.info.clone()) => {}
_ = Self::run_session_expiry(&self.session_manager) => {}
rs = listener.accept() => {
match rs {
Ok((socket, addr)) => {
Expand Down Expand Up @@ -362,6 +359,22 @@ impl Server {
}
}

async fn run_session_expiry(sessions: &RwLock<SessionManager>) -> Never {
loop {
let (expiry, expired) = {
let session_lck = trace_read_lock!(sessions);
session_lck.check_session_expiry()
};
if !expired.is_empty() {
let mut session_lck = trace_write_lock!(sessions);
for id in expired {
session_lck.expire_session(&id);
}
}
tokio::time::sleep_until(expiry.into()).await;
}
}

/// Log information about the endpoints on this server
fn log_endpoint_info(&self) {
info!("OPC UA Server: {}", self.info.application_name);
Expand Down
88 changes: 19 additions & 69 deletions lib/src/server/session/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,31 +62,6 @@ pub(crate) enum ControllerCommand {
Close,
}

#[derive(Debug)]
enum ControllerTimeout {
/// Controller is waiting for a client to establish a secure channel
WaitingForChannel(Instant),
/// Controller has an open secure channel, but no session
OpenChannel(Instant),
/// Controller has an open session. The deadline in this case is the smallest of the secure channel expiry and the
/// session timeout.
OpenSession(Instant, Instant),
}

impl ControllerTimeout {
pub async fn timeout(&self) {
match self {
ControllerTimeout::WaitingForChannel(deadline)
| ControllerTimeout::OpenChannel(deadline) => {
tokio::time::sleep_until((*deadline).into()).await
}
ControllerTimeout::OpenSession(channel, session) => {
tokio::time::sleep_until((*channel.min(session)).into()).await
}
}
}
}

/// Master type managing a single connection.
pub(crate) struct SessionController {
channel: SecureChannel,
Expand All @@ -99,7 +74,7 @@ pub(crate) struct SessionController {
Pin<Box<dyn Future<Output = Result<Response, String>> + Send + Sync + 'static>>,
>,
info: Arc<ServerInfo>,
deadline: ControllerTimeout,
deadline: Instant,
}

enum RequestProcessResult {
Expand Down Expand Up @@ -140,9 +115,8 @@ impl SessionController {
session_manager,
certificate_store,
message_handler: MessageHandler::new(info.clone(), node_managers, subscriptions),
deadline: ControllerTimeout::WaitingForChannel(
Instant::now() + Duration::from_secs(info.config.tcp_config.hello_timeout as u64),
),
deadline: Instant::now()
+ Duration::from_secs(info.config.tcp_config.hello_timeout as u64),
info,
pending_messages: FuturesUnordered::new(),
}
Expand All @@ -157,7 +131,7 @@ impl SessionController {
};

tokio::select! {
_ = self.deadline.timeout() => {
_ = tokio::time::sleep_until(self.deadline.into()) => {
if !self.transport.is_closing() {
warn!("Connection timed out, closing");
self.transport.enqueue_error(ErrorMessage::new(StatusCode::BadTimeout, "Connection timeout"));
Expand Down Expand Up @@ -227,16 +201,7 @@ impl SessionController {
&r,
);
if res.is_ok() {
match &mut self.deadline {
ControllerTimeout::OpenSession(chan, _) => {
*chan = self.channel.token_renewal_deadline()
}
s => {
*s = ControllerTimeout::OpenChannel(
self.channel.token_renewal_deadline(),
)
}
}
self.deadline = self.channel.token_renewal_deadline();
}
match res {
Ok(r) => match self
Expand Down Expand Up @@ -339,26 +304,22 @@ impl SessionController {
let mgr = trace_read_lock!(self.session_manager);
let session = mgr.find_by_token(&message.request_header().authentication_token);

let (session_id, session, user_token) = match Self::validate_request(
&message,
session,
&self.channel,
&mut self.deadline,
) {
Ok(s) => s,
Err(e) => {
match self
.transport
.enqueue_message_for_send(&mut self.channel, e, id)
{
Ok(_) => return RequestProcessResult::Ok,
Err(e) => {
error!("Failed to send request response: {e}");
return RequestProcessResult::Close;
let (session_id, session, user_token) =
match Self::validate_request(&message, session, &self.channel) {
Ok(s) => s,
Err(e) => {
match self
.transport
.enqueue_message_for_send(&mut self.channel, e, id)
{
Ok(_) => return RequestProcessResult::Ok,
Err(e) => {
error!("Failed to send request response: {e}");
return RequestProcessResult::Close;
}
}
}
}
};
};
let deadline = {
let timeout = message.request_header().timeout_hint;
let max_timeout = self.info.config.max_timeout_ms;
Expand Down Expand Up @@ -445,7 +406,6 @@ impl SessionController {
message: &SupportedMessage,
session: Option<Arc<RwLock<Session>>>,
channel: &SecureChannel,
timeout: &mut ControllerTimeout,
) -> Result<(u32, Arc<RwLock<Session>>, UserToken), SupportedMessage> {
let header = message.request_header();

Expand All @@ -460,16 +420,6 @@ impl SessionController {
let token = session_lock.validate_activated()?;
session_lock.validate_secure_channel_id(channel.secure_channel_id())?;
session_lock.validate_timed_out()?;
match timeout {
ControllerTimeout::OpenSession(_, sess) => *sess = session_lock.deadline(),
// Should be unreachable.
r => {
*r = ControllerTimeout::OpenSession(
channel.token_renewal_deadline(),
session_lock.deadline(),
)
}
}
Ok(token.clone())
})()
.map_err(|e| ServiceFault::new(header, e).into())?;
Expand Down
15 changes: 14 additions & 1 deletion lib/src/server/session/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ pub struct Session {
query_continuation_points: HashMap<ByteString, QueryContinuationPoint>,
/// User token.
user_token: Option<UserToken>,
/// Whether the session has been closed.
is_closed: bool,
}

impl Session {
Expand Down Expand Up @@ -116,6 +118,7 @@ impl Session {
user_token: None,
application_description,
message_security_mode,
is_closed: false,
}
}

Expand All @@ -141,6 +144,12 @@ impl Session {

/// Check whether this session is validated and return the appropriate error if not.
pub(crate) fn validate_activated(&self) -> Result<&UserToken, StatusCode> {
// Unlikely, but this protects against race conditions where the
// session is removed from the session cache after it has been retrieved for a service call,
// but before it has been locked.
if self.is_closed {
return Err(StatusCode::BadSessionClosed);
}
if let Some(token) = &self.user_token {
Ok(token)
} else {
Expand Down Expand Up @@ -177,6 +186,10 @@ impl Session {
self.locale_ids = locale_ids;
}

pub(crate) fn close(&mut self) {
self.is_closed = true;
}

/// Get the session ID of this session, this is known to the client, and is what they
/// use to refer to this session.
///
Expand All @@ -203,7 +216,7 @@ impl Session {

/// Whether this session is activated.
pub fn is_activated(&self) -> bool {
self.user_token.is_some()
self.user_token.is_some() && !self.is_closed
}

/// Get the secure channel ID of this session.
Expand Down
34 changes: 34 additions & 0 deletions lib/src/server/session/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
atomic::{AtomicU32, Ordering},
Arc,
},
time::{Duration, Instant},
};

use crypto::{random, security_policy::SecurityPolicy};
Expand Down Expand Up @@ -301,6 +302,11 @@ impl SessionManager {
let session_id = session.session_id().clone();

let session = self.sessions.remove(&session_id).unwrap();
{
let mut session_lck = trace_write_lock!(session);
session_lck.close();
}

if request.delete_subscriptions {
handler
.delete_session_subscriptions(id, session, token.unwrap())
Expand All @@ -311,4 +317,32 @@ impl SessionManager {
response_header: ResponseHeader::new_good(&request.request_header),
})
}

pub(crate) fn expire_session(&mut self, id: &NodeId) {
let Some(session) = self.sessions.remove(id) else {
return;
};

info!("Session {id} has expired, removing it from the session map. Subscriptions will remain until they individually expire");

let mut session = trace_write_lock!(session);
session.close();
}

pub(crate) fn check_session_expiry(&self) -> (Instant, Vec<NodeId>) {
let now = Instant::now();
let mut expired = Vec::new();
let mut expiry =
now + Duration::from_millis(self.info.config.max_session_timeout_ms as u64);
for (id, session) in &self.sessions {
let deadline = session.read().deadline();
if deadline < now {
expired.push(id.clone());
} else if deadline < expiry {
expiry = deadline;
}
}

(expiry, expired)
}
}

0 comments on commit 8d202e4

Please sign in to comment.