From 8d202e4c95fe1cd51590f36717436e527b76e349 Mon Sep 17 00:00:00 2001 From: Einar Omang Date: Mon, 29 Jul 2024 10:47:58 +0200 Subject: [PATCH] Improve session timeout logic --- lib/src/server/config/server.rs | 11 ++++ lib/src/server/server.rs | 25 ++++++-- lib/src/server/session/controller.rs | 88 ++++++---------------------- lib/src/server/session/instance.rs | 15 ++++- lib/src/server/session/manager.rs | 34 +++++++++++ 5 files changed, 97 insertions(+), 76 deletions(-) diff --git a/lib/src/server/config/server.rs b/lib/src/server/config/server.rs index 24409ece2..fd9eeedbf 100644 --- a/lib/src/server/config/server.rs +++ b/lib/src/server/config/server.rs @@ -208,6 +208,12 @@ pub struct ServerConfig { /// we will just instantly time out. #[serde(default = "defaults::max_secure_channel_token_lifetime_ms")] pub max_secure_channel_token_lifetime_ms: u32, + /// Maximum time before a session will be timed out. The client will request + /// a number, this just sets the upper limit on that value. + /// Note that there is no lower limit, if a client sets an expiry of 0 + /// we will instantly time out. + #[serde(default = "defaults::max_session_timeout_ms")] + pub max_session_timeout_ms: f64, } mod defaults { @@ -228,6 +234,10 @@ mod defaults { pub fn max_secure_channel_token_lifetime_ms() -> u32 { 300_000 } + + pub fn max_session_timeout_ms() -> f64 { + constants::MAX_SESSION_TIMEOUT + } } impl Config for ServerConfig { @@ -345,6 +355,7 @@ impl Default for ServerConfig { publish_timeout_default_ms: defaults::publish_timeout_default_ms(), max_timeout_ms: defaults::max_timeout_ms(), max_secure_channel_token_lifetime_ms: defaults::max_secure_channel_token_lifetime_ms(), + max_session_timeout_ms: defaults::max_session_timeout_ms(), } } } diff --git a/lib/src/server/server.rs b/lib/src/server/server.rs index b4838a9fd..11234f593 100644 --- a/lib/src/server/server.rs +++ b/lib/src/server/server.rs @@ -284,12 +284,9 @@ impl Server { Err(e) => error!("Connection panic! {e}") } } - _ = Self::run_subscription_ticks(self.config.subscription_poll_interval_ms, &context) => { - unreachable!() - } - _ = Self::run_discovery_server_registration(self.info.clone()) => { - unreachable!() - } + _ = Self::run_subscription_ticks(self.config.subscription_poll_interval_ms, &context) => {} + _ = Self::run_discovery_server_registration(self.info.clone()) => {} + _ = Self::run_session_expiry(&self.session_manager) => {} rs = listener.accept() => { match rs { Ok((socket, addr)) => { @@ -362,6 +359,22 @@ impl Server { } } + async fn run_session_expiry(sessions: &RwLock) -> Never { + loop { + let (expiry, expired) = { + let session_lck = trace_read_lock!(sessions); + session_lck.check_session_expiry() + }; + if !expired.is_empty() { + let mut session_lck = trace_write_lock!(sessions); + for id in expired { + session_lck.expire_session(&id); + } + } + tokio::time::sleep_until(expiry.into()).await; + } + } + /// Log information about the endpoints on this server fn log_endpoint_info(&self) { info!("OPC UA Server: {}", self.info.application_name); diff --git a/lib/src/server/session/controller.rs b/lib/src/server/session/controller.rs index 438b4fd1b..08cc7ad41 100644 --- a/lib/src/server/session/controller.rs +++ b/lib/src/server/session/controller.rs @@ -62,31 +62,6 @@ pub(crate) enum ControllerCommand { Close, } -#[derive(Debug)] -enum ControllerTimeout { - /// Controller is waiting for a client to establish a secure channel - WaitingForChannel(Instant), - /// Controller has an open secure channel, but no session - OpenChannel(Instant), - /// Controller has an open session. The deadline in this case is the smallest of the secure channel expiry and the - /// session timeout. - OpenSession(Instant, Instant), -} - -impl ControllerTimeout { - pub async fn timeout(&self) { - match self { - ControllerTimeout::WaitingForChannel(deadline) - | ControllerTimeout::OpenChannel(deadline) => { - tokio::time::sleep_until((*deadline).into()).await - } - ControllerTimeout::OpenSession(channel, session) => { - tokio::time::sleep_until((*channel.min(session)).into()).await - } - } - } -} - /// Master type managing a single connection. pub(crate) struct SessionController { channel: SecureChannel, @@ -99,7 +74,7 @@ pub(crate) struct SessionController { Pin> + Send + Sync + 'static>>, >, info: Arc, - deadline: ControllerTimeout, + deadline: Instant, } enum RequestProcessResult { @@ -140,9 +115,8 @@ impl SessionController { session_manager, certificate_store, message_handler: MessageHandler::new(info.clone(), node_managers, subscriptions), - deadline: ControllerTimeout::WaitingForChannel( - Instant::now() + Duration::from_secs(info.config.tcp_config.hello_timeout as u64), - ), + deadline: Instant::now() + + Duration::from_secs(info.config.tcp_config.hello_timeout as u64), info, pending_messages: FuturesUnordered::new(), } @@ -157,7 +131,7 @@ impl SessionController { }; tokio::select! { - _ = self.deadline.timeout() => { + _ = tokio::time::sleep_until(self.deadline.into()) => { if !self.transport.is_closing() { warn!("Connection timed out, closing"); self.transport.enqueue_error(ErrorMessage::new(StatusCode::BadTimeout, "Connection timeout")); @@ -227,16 +201,7 @@ impl SessionController { &r, ); if res.is_ok() { - match &mut self.deadline { - ControllerTimeout::OpenSession(chan, _) => { - *chan = self.channel.token_renewal_deadline() - } - s => { - *s = ControllerTimeout::OpenChannel( - self.channel.token_renewal_deadline(), - ) - } - } + self.deadline = self.channel.token_renewal_deadline(); } match res { Ok(r) => match self @@ -339,26 +304,22 @@ impl SessionController { let mgr = trace_read_lock!(self.session_manager); let session = mgr.find_by_token(&message.request_header().authentication_token); - let (session_id, session, user_token) = match Self::validate_request( - &message, - session, - &self.channel, - &mut self.deadline, - ) { - Ok(s) => s, - Err(e) => { - match self - .transport - .enqueue_message_for_send(&mut self.channel, e, id) - { - Ok(_) => return RequestProcessResult::Ok, - Err(e) => { - error!("Failed to send request response: {e}"); - return RequestProcessResult::Close; + let (session_id, session, user_token) = + match Self::validate_request(&message, session, &self.channel) { + Ok(s) => s, + Err(e) => { + match self + .transport + .enqueue_message_for_send(&mut self.channel, e, id) + { + Ok(_) => return RequestProcessResult::Ok, + Err(e) => { + error!("Failed to send request response: {e}"); + return RequestProcessResult::Close; + } } } - } - }; + }; let deadline = { let timeout = message.request_header().timeout_hint; let max_timeout = self.info.config.max_timeout_ms; @@ -445,7 +406,6 @@ impl SessionController { message: &SupportedMessage, session: Option>>, channel: &SecureChannel, - timeout: &mut ControllerTimeout, ) -> Result<(u32, Arc>, UserToken), SupportedMessage> { let header = message.request_header(); @@ -460,16 +420,6 @@ impl SessionController { let token = session_lock.validate_activated()?; session_lock.validate_secure_channel_id(channel.secure_channel_id())?; session_lock.validate_timed_out()?; - match timeout { - ControllerTimeout::OpenSession(_, sess) => *sess = session_lock.deadline(), - // Should be unreachable. - r => { - *r = ControllerTimeout::OpenSession( - channel.token_renewal_deadline(), - session_lock.deadline(), - ) - } - } Ok(token.clone()) })() .map_err(|e| ServiceFault::new(header, e).into())?; diff --git a/lib/src/server/session/instance.rs b/lib/src/server/session/instance.rs index 33e6a1729..9f556ec2c 100644 --- a/lib/src/server/session/instance.rs +++ b/lib/src/server/session/instance.rs @@ -66,6 +66,8 @@ pub struct Session { query_continuation_points: HashMap, /// User token. user_token: Option, + /// Whether the session has been closed. + is_closed: bool, } impl Session { @@ -116,6 +118,7 @@ impl Session { user_token: None, application_description, message_security_mode, + is_closed: false, } } @@ -141,6 +144,12 @@ impl Session { /// Check whether this session is validated and return the appropriate error if not. pub(crate) fn validate_activated(&self) -> Result<&UserToken, StatusCode> { + // Unlikely, but this protects against race conditions where the + // session is removed from the session cache after it has been retrieved for a service call, + // but before it has been locked. + if self.is_closed { + return Err(StatusCode::BadSessionClosed); + } if let Some(token) = &self.user_token { Ok(token) } else { @@ -177,6 +186,10 @@ impl Session { self.locale_ids = locale_ids; } + pub(crate) fn close(&mut self) { + self.is_closed = true; + } + /// Get the session ID of this session, this is known to the client, and is what they /// use to refer to this session. /// @@ -203,7 +216,7 @@ impl Session { /// Whether this session is activated. pub fn is_activated(&self) -> bool { - self.user_token.is_some() + self.user_token.is_some() && !self.is_closed } /// Get the secure channel ID of this session. diff --git a/lib/src/server/session/manager.rs b/lib/src/server/session/manager.rs index edabaabf1..9841c14aa 100644 --- a/lib/src/server/session/manager.rs +++ b/lib/src/server/session/manager.rs @@ -4,6 +4,7 @@ use std::{ atomic::{AtomicU32, Ordering}, Arc, }, + time::{Duration, Instant}, }; use crypto::{random, security_policy::SecurityPolicy}; @@ -301,6 +302,11 @@ impl SessionManager { let session_id = session.session_id().clone(); let session = self.sessions.remove(&session_id).unwrap(); + { + let mut session_lck = trace_write_lock!(session); + session_lck.close(); + } + if request.delete_subscriptions { handler .delete_session_subscriptions(id, session, token.unwrap()) @@ -311,4 +317,32 @@ impl SessionManager { response_header: ResponseHeader::new_good(&request.request_header), }) } + + pub(crate) fn expire_session(&mut self, id: &NodeId) { + let Some(session) = self.sessions.remove(id) else { + return; + }; + + info!("Session {id} has expired, removing it from the session map. Subscriptions will remain until they individually expire"); + + let mut session = trace_write_lock!(session); + session.close(); + } + + pub(crate) fn check_session_expiry(&self) -> (Instant, Vec) { + let now = Instant::now(); + let mut expired = Vec::new(); + let mut expiry = + now + Duration::from_millis(self.info.config.max_session_timeout_ms as u64); + for (id, session) in &self.sessions { + let deadline = session.read().deadline(); + if deadline < now { + expired.push(id.clone()); + } else if deadline < expiry { + expiry = deadline; + } + } + + (expiry, expired) + } }