diff --git a/lib/src/async_server/info.rs b/lib/src/async_server/info.rs index e4b1a6f62..024b190cb 100644 --- a/lib/src/async_server/info.rs +++ b/lib/src/async_server/info.rs @@ -112,6 +112,8 @@ pub struct ServerInfo { pub type_tree: Arc>, /// Generator for subscription IDs. pub subscription_id_handle: AtomicHandle, + /// Generator for monitored item IDs. + pub monitored_item_id_handle: AtomicHandle, } impl ServerInfo { diff --git a/lib/src/async_server/mod.rs b/lib/src/async_server/mod.rs index d1e88c34a..c4d3f066d 100644 --- a/lib/src/async_server/mod.rs +++ b/lib/src/async_server/mod.rs @@ -11,7 +11,7 @@ mod transport; pub use config::{ServerConfig, ServerEndpoint, ServerUserToken}; pub use server_core::ServerCore; -pub use subscriptions::SubscriptionCache; +pub use subscriptions::{MonitoredItemHandle, SubscriptionCache}; pub mod constants { //! Provides constants that govern the internal workings of the server implementation. diff --git a/lib/src/async_server/node_manager/context.rs b/lib/src/async_server/node_manager/context.rs index 629fcb52a..2ae1b6092 100644 --- a/lib/src/async_server/node_manager/context.rs +++ b/lib/src/async_server/node_manager/context.rs @@ -4,6 +4,7 @@ use crate::{ async_server::{ authenticator::{AuthManager, UserToken}, session::{instance::Session, message_handler::NodeManagers}, + SubscriptionCache, }, server::prelude::{BrowseDescriptionResultMask, NodeId}, sync::RwLock, @@ -16,10 +17,12 @@ use super::{ pub struct RequestContext { pub session: Arc>, + pub session_id: u32, pub authenticator: Arc, pub token: UserToken, pub current_node_manager_index: usize, pub type_tree: Arc>, + pub subscriptions: Arc, } impl RequestContext {} diff --git a/lib/src/async_server/node_manager/mod.rs b/lib/src/async_server/node_manager/mod.rs index d9e5fd42f..3af731a0e 100644 --- a/lib/src/async_server/node_manager/mod.rs +++ b/lib/src/async_server/node_manager/mod.rs @@ -1,9 +1,11 @@ use async_trait::async_trait; use crate::server::prelude::{ - DeleteAtTimeDetails, DeleteEventDetails, DeleteRawModifiedDetails, NodeId, ReadAtTimeDetails, - ReadEventDetails, ReadProcessedDetails, ReadRawModifiedDetails, StatusCode, TimestampsToReturn, - UpdateDataDetails, UpdateEventDetails, UpdateStructureDataDetails, WriteValue, + DeleteAtTimeDetails, DeleteEventDetails, DeleteRawModifiedDetails, ModifyMonitoredItemsRequest, + MonitoredItemModifyRequest, MonitoredItemModifyResult, MonitoringMode, NodeId, + ReadAtTimeDetails, ReadEventDetails, ReadProcessedDetails, ReadRawModifiedDetails, StatusCode, + TimestampsToReturn, UpdateDataDetails, UpdateEventDetails, UpdateStructureDataDetails, + WriteValue, }; mod context; @@ -15,7 +17,7 @@ mod view; use self::view::ExternalReferenceRequest; -use super::subscriptions::CreateMonitoredItem; +use super::{subscriptions::CreateMonitoredItem, MonitoredItemHandle}; pub use { context::RequestContext, @@ -256,8 +258,57 @@ pub trait NodeManager { /// is not Disabled. async fn create_monitored_items( &self, + context: &RequestContext, items: &[&mut CreateMonitoredItem], ) -> Result<(), StatusCode> { Err(StatusCode::BadServiceUnsupported) } + + /// Modify monitored items. This method is purely informative for the node manager, + /// to let it modify sampling intervals, apply a new filter, or similar. + /// + /// Node managers are not required to take any action here, and this method is not + /// allowed to fail. + async fn modify_monitored_items( + &self, + context: &RequestContext, + subscription_id: u32, + items: &[&MonitoredItemModifyResult], + ) { + } + + /// Modify monitored items. This method is purely informative for the node manager, + /// to let it pause or resume sampling. Note that this should _not_ delete context + /// stored from `create_monitored_items`, since it may be called again to resume sampling. + /// + /// The node manager should sample so long as monitoring mode is not `Disabled`, the difference + /// between `Reporting` and `Sampling` is handled by the server. + /// + /// Node managers are not required to take any action here, and this method is not + /// allowed to fail. + async fn set_monitoring_mode( + &self, + context: &RequestContext, + mode: MonitoringMode, + subscription_id: u32, + items: &[MonitoredItemHandle], + ) { + } + + /// Delete monitored items. This method is purely informative for the node manager, + /// to let it stop sampling, or similar. + /// + /// Node managers are not required to take any action here, and this method is not + /// allowed to fail. Most node managers that implement subscriptions will want to do + /// something with this. + /// + /// This method may be given monitored items that were never created, or were + /// created for a different node manager. Attempting to delete a monitored item + /// that does not exist is handled elsewhere and should be a no-op here. + async fn delete_monitored_items( + &self, + context: &RequestContext, + items: &[MonitoredItemHandle], + ) { + } } diff --git a/lib/src/async_server/server_core.rs b/lib/src/async_server/server_core.rs index 5bd19b2e6..9639480fd 100644 --- a/lib/src/async_server/server_core.rs +++ b/lib/src/async_server/server_core.rs @@ -116,6 +116,7 @@ impl ServerCore { receive_buffer_size, type_tree: Arc::new(RwLock::new(TypeTree::new())), subscription_id_handle: AtomicHandle::new(1), + monitored_item_id_handle: AtomicHandle::new(1), }; let certificate_store = Arc::new(RwLock::new(certificate_store)); diff --git a/lib/src/async_server/session/controller.rs b/lib/src/async_server/session/controller.rs index 87da3dc6f..16b2ffc0d 100644 --- a/lib/src/async_server/session/controller.rs +++ b/lib/src/async_server/session/controller.rs @@ -214,7 +214,9 @@ impl SessionController { SupportedMessage::CloseSessionRequest(request) => { let mut mgr = trace_write_lock!(self.session_manager); - let res = mgr.close_session(&mut self.channel, &request); + let res = mgr + .close_session(&mut self.channel, &mut self.message_handler, &request) + .await; drop(mgr); self.process_service_result(res, request.request_header.request_handle, id) } diff --git a/lib/src/async_server/session/manager.rs b/lib/src/async_server/session/manager.rs index e36c3e0de..c4993f567 100644 --- a/lib/src/async_server/session/manager.rs +++ b/lib/src/async_server/session/manager.rs @@ -19,7 +19,7 @@ use crate::{ }, }; -use super::instance::Session; +use super::{instance::Session, message_handler::MessageHandler}; lazy_static! { static ref NEXT_SESSION_ID: AtomicU32 = AtomicU32::new(1); @@ -272,9 +272,10 @@ impl SessionManager { } } - pub fn close_session( + pub async fn close_session( &mut self, channel: &mut SecureChannel, + handler: &mut MessageHandler, request: &CloseSessionRequest, ) -> Result { let Some(session) = self.find_by_token(&request.request_header.authentication_token) else { @@ -282,16 +283,22 @@ impl SessionManager { }; let session = trace_read_lock!(session); + let id = session.session_id_numeric(); + let token = session.user_token().cloned(); let secure_channel_id = channel.secure_channel_id(); if !session.is_activated() && session.secure_channel_id() != secure_channel_id { error!("close_session rejected, secure channel id {} for inactive session does not match one used to create session, {}", secure_channel_id, session.secure_channel_id()); return Err(StatusCode::BadSecureChannelIdInvalid); } - let session_id = session.session_id().clone(); - self.sessions.remove(&session_id); + let session = self.sessions.remove(&session_id).unwrap(); + if request.delete_subscriptions { + handler + .delete_session_subscriptions(id, session, token.unwrap()) + .await; + } Ok(CloseSessionResponse { response_header: ResponseHeader::new_good(&request.request_header), diff --git a/lib/src/async_server/session/message_handler.rs b/lib/src/async_server/session/message_handler.rs index fe0db745d..14e787eb2 100644 --- a/lib/src/async_server/session/message_handler.rs +++ b/lib/src/async_server/session/message_handler.rs @@ -12,14 +12,19 @@ use crate::{ resolve_external_references, BrowseNode, BrowsePathItem, ExternalReferencesContPoint, NodeManager, ReadNode, RegisterNodeItem, RequestContext, }, - subscriptions::{PendingPublish, SubscriptionCache}, + subscriptions::{CreateMonitoredItem, PendingPublish, SubscriptionCache}, }, server::prelude::{ BrowseNextRequest, BrowseNextResponse, BrowsePathResult, BrowsePathTarget, BrowseRequest, - BrowseResponse, BrowseResult, ByteString, PublishRequest, ReadRequest, ReadResponse, - RegisterNodesRequest, RegisterNodesResponse, ResponseHeader, ServiceFault, StatusCode, - SupportedMessage, TimestampsToReturn, TranslateBrowsePathsToNodeIdsRequest, - TranslateBrowsePathsToNodeIdsResponse, UnregisterNodesRequest, UnregisterNodesResponse, + BrowseResponse, BrowseResult, ByteString, CreateMonitoredItemsRequest, + CreateMonitoredItemsResponse, DeleteMonitoredItemsRequest, DeleteMonitoredItemsResponse, + DeleteSubscriptionsRequest, DeleteSubscriptionsResponse, ModifyMonitoredItemsRequest, + ModifyMonitoredItemsResponse, PublishRequest, ReadRequest, ReadResponse, + RegisterNodesRequest, RegisterNodesResponse, ResponseHeader, ServiceFault, + SetMonitoringModeRequest, SetMonitoringModeResponse, SetTriggeringRequest, + SetTriggeringResponse, StatusCode, SupportedMessage, TimestampsToReturn, + TranslateBrowsePathsToNodeIdsRequest, TranslateBrowsePathsToNodeIdsResponse, + UnregisterNodesRequest, UnregisterNodesResponse, }, }; @@ -72,6 +77,8 @@ struct Request { pub info: Arc, pub session: Arc>, pub token: UserToken, + pub subscriptions: Arc, + pub session_id: u32, } macro_rules! service_fault { @@ -91,6 +98,8 @@ impl Request { request_handle: u32, session: Arc>, token: UserToken, + subscriptions: Arc, + session_id: u32, ) -> Self { Self { request, @@ -99,6 +108,8 @@ impl Request { info, session, token, + subscriptions, + session_id, } } @@ -116,6 +127,8 @@ impl Request { token: self.token.clone(), current_node_manager_index: 0, type_tree: self.info.type_tree.clone(), + subscriptions: self.subscriptions.clone(), + session_id: self.session_id, } } } @@ -131,6 +144,8 @@ macro_rules! async_service_call { $r.request_handle, $r.session, $r.token, + $slf.subscriptions.clone(), + $r.session_id, ), ))) }; @@ -198,6 +213,24 @@ impl MessageHandler { async_service_call!(unregister_nodes, self, request, data) } + SupportedMessage::CreateMonitoredItemsRequest(request) => { + async_service_call!(create_monitored_items, self, request, data) + } + + SupportedMessage::ModifyMonitoredItemsRequest(request) => { + async_service_call!(modify_monitored_items, self, request, data) + } + + SupportedMessage::SetMonitoringModeRequest(request) => { + async_service_call!(set_monitoring_mode, self, request, data) + } + + SupportedMessage::DeleteMonitoredItemsRequest(request) => { + async_service_call!(delete_monitored_items, self, request, data) + } + + SupportedMessage::SetTriggeringRequest(request) => self.set_triggering(request, data), + SupportedMessage::PublishRequest(request) => self.publish(request, data), SupportedMessage::RepublishRequest(request) => { @@ -249,6 +282,10 @@ impl MessageHandler { }) } + SupportedMessage::DeleteSubscriptionsRequest(request) => { + async_service_call!(delete_subscriptions, self, request, data) + } + message => { debug!( "Message handler does not handle this kind of message {:?}", @@ -843,6 +880,10 @@ impl MessageHandler { .filter(|n| mgr.owns_node(n.node_id())) .collect(); + if owned.is_empty() { + continue; + } + // All errors are fatal in this case, node managers should avoid them. if let Err(e) = mgr.register_nodes(&context, &mut owned).await { error!("Register nodes failed for node manager {}: {e}", mgr.name()); @@ -888,6 +929,10 @@ impl MessageHandler { .filter(|n| mgr.owns_node(n)) .collect(); + if owned.is_empty() { + continue; + } + // All errors are fatal in this case, node managers should avoid them. if let Err(e) = mgr.unregister_nodes(&context, &owned).await { error!( @@ -907,6 +952,380 @@ impl MessageHandler { } } + async fn create_monitored_items( + node_managers: NodeManagers, + request: Request, + ) -> Response { + let context = request.context(); + let Some(items_to_create) = request.request.items_to_create else { + return service_fault!(request, StatusCode::BadNothingToDo); + }; + if items_to_create.is_empty() { + return service_fault!(request, StatusCode::BadNothingToDo); + } + if items_to_create.len() > request.info.operational_limits.max_monitored_items_per_call { + return service_fault!(request, StatusCode::BadTooManyOperations); + } + let Some(len) = request + .subscriptions + .get_monitored_item_count(request.session_id, request.request.subscription_id) + else { + return service_fault!(request, StatusCode::BadSubscriptionIdInvalid); + }; + + let max_per_sub = request + .info + .config + .limits + .subscriptions + .max_monitored_items_per_sub; + if max_per_sub > 0 && max_per_sub < len + items_to_create.len() { + return service_fault!(request, StatusCode::BadTooManyMonitoredItems); + } + + let mut items: Vec<_> = items_to_create + .into_iter() + .map(|r| { + CreateMonitoredItem::new( + r, + request.info.monitored_item_id_handle.next(), + request.request.subscription_id, + &request.info, + request.request.timestamps_to_return, + ) + }) + .collect(); + + for mgr in &node_managers { + let owned: Vec<_> = items + .iter_mut() + .filter(|n| { + n.status_code().is_good() && mgr.owns_node(&n.item_to_monitor().node_id) + }) + .collect(); + + if owned.is_empty() { + continue; + } + + if let Err(e) = mgr.create_monitored_items(&context, &owned).await { + for n in owned { + n.set_status(e); + } + } + } + + let handles: Vec<_> = items.iter().map(|i| i.handle()).collect(); + + let res = match request.subscriptions.create_monitored_items( + request.session_id, + request.request.subscription_id, + items, + ) { + Ok(r) => r, + // Shouldn't happen, would be due to a race condition. If it does happen we're fine with failing. + Err(e) => { + // Should clean up any that failed to create though. + for mgr in &node_managers { + mgr.delete_monitored_items(&context, &handles).await; + } + return service_fault!(request, e); + } + }; + + Response { + message: CreateMonitoredItemsResponse { + response_header: ResponseHeader::new_good(request.request_handle), + results: Some(res), + diagnostic_infos: None, + } + .into(), + request_id: request.request_id, + } + } + + async fn modify_monitored_items( + node_managers: NodeManagers, + request: Request, + ) -> Response { + let context = request.context(); + let Some(items_to_modify) = request.request.items_to_modify else { + return service_fault!(request, StatusCode::BadNothingToDo); + }; + if items_to_modify.is_empty() { + return service_fault!(request, StatusCode::BadNothingToDo); + } + if items_to_modify.len() > request.info.operational_limits.max_monitored_items_per_call { + return service_fault!(request, StatusCode::BadTooManyOperations); + } + + // Call modify first, then only pass successful modify's to the node managers. + let results = match request.subscriptions.modify_monitored_items( + request.session_id, + request.request.subscription_id, + &request.info, + request.request.timestamps_to_return, + items_to_modify, + ) { + Ok(r) => r, + Err(e) => return service_fault!(request, e), + }; + + for mgr in node_managers { + let owned: Vec<_> = results + .iter() + .filter(|n| n.0.status_code.is_good() && mgr.owns_node(&n.1)) + .map(|n| &n.0) + .collect(); + + if owned.is_empty() { + continue; + } + + mgr.modify_monitored_items(&context, request.request.subscription_id, &owned) + .await; + } + + Response { + message: ModifyMonitoredItemsResponse { + response_header: ResponseHeader::new_good(request.request_handle), + results: Some(results.into_iter().map(|r| r.0).collect()), + diagnostic_infos: None, + } + .into(), + request_id: request.request_id, + } + } + + async fn set_monitoring_mode( + node_managers: NodeManagers, + request: Request, + ) -> Response { + let context = request.context(); + let Some(items) = request.request.monitored_item_ids else { + return service_fault!(request, StatusCode::BadNothingToDo); + }; + if items.is_empty() { + return service_fault!(request, StatusCode::BadNothingToDo); + } + if items.len() > request.info.operational_limits.max_monitored_items_per_call { + return service_fault!(request, StatusCode::BadTooManyOperations); + } + + let results = match request.subscriptions.set_monitoring_mode( + request.session_id, + request.request.subscription_id, + request.request.monitoring_mode, + items, + ) { + Ok(r) => r, + Err(e) => return service_fault!(request, e), + }; + + for mgr in node_managers { + let owned: Vec<_> = results + .iter() + .filter(|n| n.1.is_good() && mgr.owns_node(&n.2)) + .map(|n| n.0) + .collect(); + + if owned.is_empty() { + continue; + } + + mgr.set_monitoring_mode( + &context, + request.request.monitoring_mode, + request.request.subscription_id, + &owned, + ) + .await; + } + + Response { + message: SetMonitoringModeResponse { + response_header: ResponseHeader::new_good(request.request_handle), + results: Some(results.into_iter().map(|r| r.1).collect()), + diagnostic_infos: None, + } + .into(), + request_id: request.request_id, + } + } + + async fn delete_monitored_items( + node_managers: NodeManagers, + request: Request, + ) -> Response { + let context = request.context(); + let Some(items) = request.request.monitored_item_ids else { + return service_fault!(request, StatusCode::BadNothingToDo); + }; + if items.is_empty() { + return service_fault!(request, StatusCode::BadNothingToDo); + } + if items.len() > request.info.operational_limits.max_monitored_items_per_call { + return service_fault!(request, StatusCode::BadTooManyOperations); + } + + let results = match request.subscriptions.delete_monitored_items( + request.session_id, + request.request.subscription_id, + &items, + ) { + Ok(r) => r, + Err(e) => return service_fault!(request, e), + }; + + for mgr in node_managers { + let owned: Vec<_> = results + .iter() + .filter(|n| n.1.is_good() && mgr.owns_node(&n.2)) + .map(|n| n.0) + .collect(); + + if owned.is_empty() { + continue; + } + + mgr.delete_monitored_items(&context, &owned).await; + } + + Response { + message: DeleteMonitoredItemsResponse { + response_header: ResponseHeader::new_good(request.request_handle), + results: Some(results.into_iter().map(|r| r.1).collect()), + diagnostic_infos: None, + } + .into(), + request_id: request.request_id, + } + } + + async fn delete_subscriptions( + node_managers: NodeManagers, + request: Request, + ) -> Response { + let context = request.context(); + let Some(items) = request.request.subscription_ids else { + return service_fault!(request, StatusCode::BadNothingToDo); + }; + if items.is_empty() { + return service_fault!(request, StatusCode::BadNothingToDo); + } + + let results = match Self::delete_subscriptions_inner( + node_managers, + items, + &request.subscriptions, + &context, + ) + .await + { + Ok(r) => r, + Err(e) => return service_fault!(request, e), + }; + + Response { + message: DeleteSubscriptionsResponse { + response_header: ResponseHeader::new_good(request.request_handle), + results: Some(results), + diagnostic_infos: None, + } + .into(), + request_id: request.request_id, + } + } + + async fn delete_subscriptions_inner( + node_managers: NodeManagers, + to_delete: Vec, + subscriptions: &SubscriptionCache, + context: &RequestContext, + ) -> Result, StatusCode> { + let results = subscriptions.delete_subscriptions(context.session_id, &to_delete)?; + + for mgr in node_managers { + let owned: Vec<_> = results + .iter() + .filter(|f| f.0.is_good()) + .flat_map(|f| f.1.iter().filter(|i| mgr.owns_node(&i.1))) + .map(|i| i.0) + .collect(); + + if owned.is_empty() { + continue; + } + + mgr.delete_monitored_items(&context, &owned).await; + } + + Ok(results.into_iter().map(|r| r.0).collect()) + } + + pub async fn delete_session_subscriptions( + &mut self, + session_id: u32, + session: Arc>, + token: UserToken, + ) { + let ids = self.subscriptions.get_session_subscription_ids(session_id); + if ids.is_empty() { + return; + } + + let context = RequestContext { + session, + session_id, + authenticator: self.info.authenticator.clone(), + token, + current_node_manager_index: 0, + type_tree: self.info.type_tree.clone(), + subscriptions: self.subscriptions.clone(), + }; + + // Ignore the result + if let Err(e) = Self::delete_subscriptions_inner( + self.node_managers.clone(), + ids, + &self.subscriptions, + &context, + ) + .await + { + warn!("Cleaning up session subscriptions failed: {e}"); + } + } + + fn set_triggering( + &self, + request: Box, + data: RequestData, + ) -> HandleMessageResult { + let result = self + .subscriptions + .set_triggering( + data.session_id, + request.subscription_id, + request.triggering_item_id, + request.links_to_add.unwrap_or_default(), + request.links_to_remove.unwrap_or_default(), + ) + .map(|(add_res, remove_res)| SetTriggeringResponse { + response_header: ResponseHeader::new_good(&request.request_header), + add_results: Some(add_res), + add_diagnostic_infos: None, + remove_results: Some(remove_res), + remove_diagnostic_infos: None, + }); + + HandleMessageResult::SyncMessage(Response::from_result( + result, + data.request_handle, + data.request_id, + )) + } + fn publish(&self, request: Box, data: RequestData) -> HandleMessageResult { let now = Utc::now(); let now_instant = Instant::now(); diff --git a/lib/src/async_server/subscriptions/mod.rs b/lib/src/async_server/subscriptions/mod.rs index 45dbeff19..8e9f6a761 100644 --- a/lib/src/async_server/subscriptions/mod.rs +++ b/lib/src/async_server/subscriptions/mod.rs @@ -8,15 +8,18 @@ use chrono::Utc; use hashbrown::HashMap; pub use monitored_item::CreateMonitoredItem; use session_subscriptions::SessionSubscriptions; -use subscription::{MonitoredItemHandle, TickReason}; +pub use subscription::MonitoredItemHandle; +use subscription::TickReason; use crate::{ server::prelude::{ CreateSubscriptionRequest, CreateSubscriptionResponse, DataValue, DateTimeUtc, MessageSecurityMode, ModifySubscriptionRequest, ModifySubscriptionResponse, - NotificationMessage, PublishRequest, RepublishRequest, RepublishResponse, ResponseHeader, - SetPublishingModeRequest, SetPublishingModeResponse, StatusCode, SupportedMessage, - TransferResult, TransferSubscriptionsRequest, TransferSubscriptionsResponse, + MonitoredItemCreateResult, MonitoredItemModifyRequest, MonitoredItemModifyResult, + MonitoringMode, NodeId, NotificationMessage, PublishRequest, RepublishRequest, + RepublishResponse, ResponseHeader, SetPublishingModeRequest, SetPublishingModeResponse, + StatusCode, SupportedMessage, TimestampsToReturn, TransferResult, + TransferSubscriptionsRequest, TransferSubscriptionsResponse, }, sync::{Mutex, RwLock}, }; @@ -69,6 +72,21 @@ impl SubscriptionCache { } } + pub(crate) fn get_monitored_item_count( + &self, + session_id: u32, + subscription_id: u32, + ) -> Option { + let Some(cache) = ({ + let lck = trace_read_lock!(self.inner); + lck.session_subscriptions.get(&session_id).cloned() + }) else { + return None; + }; + let cache_lck = cache.lock(); + cache_lck.get_monitored_item_count(subscription_id) + } + pub(crate) fn create_subscription( &self, session_id: u32, @@ -186,6 +204,42 @@ impl SubscriptionCache { } } + pub(crate) fn create_monitored_items( + &self, + session_id: u32, + subscription_id: u32, + requests: Vec, + ) -> Result, StatusCode> { + let Some(cache) = ({ + let lck = trace_read_lock!(self.inner); + lck.session_subscriptions.get(&session_id).cloned() + }) else { + return Err(StatusCode::BadNoSubscription); + }; + + let mut cache_lck = cache.lock(); + cache_lck.create_monitored_items(subscription_id, requests) + } + + pub(crate) fn modify_monitored_items( + &self, + session_id: u32, + subscription_id: u32, + info: &ServerInfo, + timestamps_to_return: TimestampsToReturn, + requests: Vec, + ) -> Result, StatusCode> { + let Some(cache) = ({ + let lck = trace_read_lock!(self.inner); + lck.session_subscriptions.get(&session_id).cloned() + }) else { + return Err(StatusCode::BadNoSubscription); + }; + + let mut cache_lck = cache.lock(); + cache_lck.modify_monitored_items(subscription_id, info, timestamps_to_return, requests) + } + fn get_key(session: &RwLock) -> PersistentSessionKey { let lck = trace_read_lock!(session); PersistentSessionKey::new( @@ -195,6 +249,95 @@ impl SubscriptionCache { ) } + pub(crate) fn set_monitoring_mode( + &self, + session_id: u32, + subscription_id: u32, + monitoring_mode: MonitoringMode, + items: Vec, + ) -> Result, StatusCode> { + let Some(cache) = ({ + let lck = trace_read_lock!(self.inner); + lck.session_subscriptions.get(&session_id).cloned() + }) else { + return Err(StatusCode::BadNoSubscription); + }; + + let mut cache_lck = cache.lock(); + cache_lck.set_monitoring_mode(subscription_id, monitoring_mode, items) + } + + pub(crate) fn set_triggering( + &self, + session_id: u32, + subscription_id: u32, + triggering_item_id: u32, + links_to_add: Vec, + links_to_remove: Vec, + ) -> Result<(Vec, Vec), StatusCode> { + let Some(cache) = ({ + let lck = trace_read_lock!(self.inner); + lck.session_subscriptions.get(&session_id).cloned() + }) else { + return Err(StatusCode::BadNoSubscription); + }; + + let mut cache_lck = cache.lock(); + cache_lck.set_triggering( + subscription_id, + triggering_item_id, + links_to_add, + links_to_remove, + ) + } + + pub(crate) fn delete_monitored_items( + &self, + session_id: u32, + subscription_id: u32, + items: &[u32], + ) -> Result, StatusCode> { + let Some(cache) = ({ + let lck = trace_read_lock!(self.inner); + lck.session_subscriptions.get(&session_id).cloned() + }) else { + return Err(StatusCode::BadNoSubscription); + }; + + let mut cache_lck = cache.lock(); + cache_lck.delete_monitored_items(subscription_id, items) + } + + pub(crate) fn delete_subscriptions( + &self, + session_id: u32, + ids: &[u32], + ) -> Result)>, StatusCode> { + let mut lck = trace_write_lock!(self.inner); + let Some(cache) = lck.session_subscriptions.get(&session_id).cloned() else { + return Err(StatusCode::BadNoSubscription); + }; + let mut cache_lck = cache.lock(); + for id in ids { + if cache_lck.contains(*id) { + lck.subscription_to_session.remove(id); + } + } + Ok(cache_lck.delete_subscriptions(ids)) + } + + pub(crate) fn get_session_subscription_ids(&self, session_id: u32) -> Vec { + let Some(cache) = ({ + let lck = trace_read_lock!(self.inner); + lck.session_subscriptions.get(&session_id).cloned() + }) else { + return Vec::new(); + }; + + let cache_lck = cache.lock(); + cache_lck.subscription_ids() + } + pub(crate) fn transfer( &self, req: &TransferSubscriptionsRequest, diff --git a/lib/src/async_server/subscriptions/monitored_item.rs b/lib/src/async_server/subscriptions/monitored_item.rs index d5c1272fa..4607377d1 100644 --- a/lib/src/async_server/subscriptions/monitored_item.rs +++ b/lib/src/async_server/subscriptions/monitored_item.rs @@ -126,14 +126,19 @@ impl CreateMonitoredItem { sub_id: u32, info: &ServerInfo, timestamps_to_return: TimestampsToReturn, - ) -> Result { + ) -> Self { let filter = - FilterType::from_filter(&req.requested_parameters.filter, &info.decoding_options())?; + FilterType::from_filter(&req.requested_parameters.filter, &info.decoding_options()); let sampling_interval = sanitize_sampling_interval(info, req.requested_parameters.sampling_interval); let queue_size = sanitize_queue_size(info, req.requested_parameters.queue_size as usize); - Ok(Self { + let (filter, status) = match filter { + Ok(s) => (s, StatusCode::Good), + Err(e) => (FilterType::None, e), + }; + + Self { id, subscription_id: sub_id, item_to_monitor: req.item_to_monitor, @@ -143,13 +148,13 @@ impl CreateMonitoredItem { queue_size, sampling_interval, initial_value: None, - status_code: StatusCode::Good, + status_code: status, filter, timestamps_to_return, - }) + } } - pub fn id(&self) -> MonitoredItemHandle { + pub fn handle(&self) -> MonitoredItemHandle { MonitoredItemHandle { monitored_item_id: self.id, subscription_id: self.subscription_id, @@ -201,6 +206,10 @@ impl CreateMonitoredItem { pub fn timestamps_to_return(&self) -> TimestampsToReturn { self.timestamps_to_return } + + pub fn status_code(&self) -> StatusCode { + self.status_code + } } #[derive(Debug)] @@ -411,8 +420,8 @@ impl MonitoredItem { }); } - pub fn monitoring_mode(&self) -> MonitoringMode { - self.monitoring_mode + pub fn remove_dead_trigger(&mut self, id: u32) { + self.triggered_items.remove(&id); } pub fn is_reporting(&self) -> bool { @@ -430,4 +439,24 @@ impl MonitoredItem { pub fn has_notifications(&self) -> bool { !self.notification_queue.is_empty() } + + pub fn id(&self) -> u32 { + self.id + } + + pub fn sampling_interval(&self) -> f64 { + self.sampling_interval + } + + pub fn queue_size(&self) -> usize { + self.queue_size + } + + pub fn item_to_monitor(&self) -> &ReadValueId { + &self.item_to_monitor + } + + pub fn set_monitoring_mode(&mut self, monitoring_mode: MonitoringMode) { + self.monitoring_mode = monitoring_mode; + } } diff --git a/lib/src/async_server/subscriptions/session_subscriptions.rs b/lib/src/async_server/subscriptions/session_subscriptions.rs index 0cfa30475..fc9c6d600 100644 --- a/lib/src/async_server/subscriptions/session_subscriptions.rs +++ b/lib/src/async_server/subscriptions/session_subscriptions.rs @@ -4,18 +4,21 @@ use std::{ }; use super::{ + monitored_item::MonitoredItem, subscription::{MonitoredItemHandle, Subscription, TickReason}, - NonAckedPublish, PendingPublish, PersistentSessionKey, SubscriptionLimits, + CreateMonitoredItem, NonAckedPublish, PendingPublish, PersistentSessionKey, SubscriptionLimits, }; -use hashbrown::HashMap; +use hashbrown::{HashMap, HashSet}; use crate::{ async_server::info::ServerInfo, server::prelude::{ CreateSubscriptionRequest, CreateSubscriptionResponse, DataValue, DateTime, DateTimeUtc, - ModifySubscriptionRequest, ModifySubscriptionResponse, NotificationMessage, PublishRequest, - PublishResponse, RepublishRequest, RepublishResponse, ResponseHeader, ServiceFault, - SetPublishingModeRequest, SetPublishingModeResponse, StatusCode, + ExtensionObject, ModifySubscriptionRequest, ModifySubscriptionResponse, + MonitoredItemCreateResult, MonitoredItemModifyRequest, MonitoredItemModifyResult, + MonitoringMode, NodeId, NotificationMessage, PublishRequest, PublishResponse, + RepublishRequest, RepublishResponse, ResponseHeader, ServiceFault, + SetPublishingModeRequest, SetPublishingModeResponse, StatusCode, TimestampsToReturn, }, }; @@ -71,6 +74,14 @@ impl SessionSubscriptions { Ok(()) } + pub fn contains(&self, sub_id: u32) -> bool { + self.subscriptions.contains_key(&sub_id) + } + + pub fn subscription_ids(&self) -> Vec { + self.subscriptions.keys().copied().collect() + } + pub fn remove(&mut self, subscription_id: u32) -> (Option, Vec) { let mut notifs = Vec::new(); let mut idx = 0; @@ -207,6 +218,232 @@ impl SessionSubscriptions { }) } + pub(super) fn create_monitored_items( + &mut self, + subscription_id: u32, + requests: Vec, + ) -> Result, StatusCode> { + let Some(sub) = self.subscriptions.get_mut(&subscription_id) else { + return Err(StatusCode::BadSubscriptionIdInvalid); + }; + + // TODO: Filter validation. + let mut results = Vec::with_capacity(requests.len()); + for item in requests { + if item.status_code().is_good() { + let new_item = MonitoredItem::new(&item); + results.push(MonitoredItemCreateResult { + status_code: StatusCode::Good, + monitored_item_id: new_item.id(), + revised_sampling_interval: new_item.sampling_interval(), + revised_queue_size: new_item.queue_size() as u32, + filter_result: ExtensionObject::null(), + }); + sub.monitored_items.insert(new_item.id(), new_item); + } else { + results.push(MonitoredItemCreateResult { + status_code: item.status_code(), + monitored_item_id: 0, + revised_sampling_interval: item.sampling_interval(), + revised_queue_size: item.queue_size() as u32, + filter_result: ExtensionObject::null(), + }); + } + } + + Ok(results) + } + + pub(super) fn modify_monitored_items( + &mut self, + subscription_id: u32, + info: &ServerInfo, + timestamps_to_return: TimestampsToReturn, + requests: Vec, + ) -> Result, StatusCode> { + let Some(sub) = self.subscriptions.get_mut(&subscription_id) else { + return Err(StatusCode::BadSubscriptionIdInvalid); + }; + let mut results = Vec::with_capacity(requests.len()); + for request in requests { + if let Some(item) = sub.monitored_items.get_mut(&request.monitored_item_id) { + match item.modify(info, timestamps_to_return, &request) { + Ok(f) => results.push(( + MonitoredItemModifyResult { + status_code: StatusCode::Good, + revised_sampling_interval: item.sampling_interval(), + revised_queue_size: item.queue_size() as u32, + filter_result: f, + }, + item.item_to_monitor().node_id.clone(), + )), + Err(e) => results.push(( + MonitoredItemModifyResult { + status_code: e, + revised_sampling_interval: item.sampling_interval(), + revised_queue_size: item.queue_size() as u32, + filter_result: ExtensionObject::null(), + }, + NodeId::null(), + )), + }; + } else { + results.push(( + MonitoredItemModifyResult { + status_code: StatusCode::BadMonitoredItemIdInvalid, + revised_sampling_interval: 0.0, + revised_queue_size: 0, + filter_result: ExtensionObject::null(), + }, + NodeId::null(), + )); + } + } + + Ok(results) + } + + pub(super) fn set_monitoring_mode( + &mut self, + subscription_id: u32, + monitoring_mode: MonitoringMode, + items: Vec, + ) -> Result, StatusCode> { + let Some(sub) = self.subscriptions.get_mut(&subscription_id) else { + return Err(StatusCode::BadSubscriptionIdInvalid); + }; + let mut results = Vec::with_capacity(items.len()); + for id in items { + let handle = MonitoredItemHandle { + subscription_id, + monitored_item_id: id, + }; + if let Some(item) = sub.monitored_items.get_mut(&id) { + results.push(( + handle, + StatusCode::Good, + item.item_to_monitor().node_id.clone(), + )); + item.set_monitoring_mode(monitoring_mode); + } else { + results.push(( + handle, + StatusCode::BadMonitoredItemIdInvalid, + NodeId::null(), + )) + } + } + Ok(results) + } + + fn filter_links( + links: Vec, + items: &std::collections::HashMap, + ) -> (Vec, Vec) { + let mut to_apply = Vec::with_capacity(links.len()); + let mut results = Vec::with_capacity(links.len()); + + for link in links { + if items.contains_key(&link) { + to_apply.push(link); + results.push(StatusCode::Good); + } else { + results.push(StatusCode::BadMonitoredItemIdInvalid); + } + } + (to_apply, results) + } + + pub(super) fn set_triggering( + &mut self, + subscription_id: u32, + triggering_item_id: u32, + links_to_add: Vec, + links_to_remove: Vec, + ) -> Result<(Vec, Vec), StatusCode> { + let Some(sub) = self.subscriptions.get_mut(&subscription_id) else { + return Err(StatusCode::BadSubscriptionIdInvalid); + }; + if !sub.monitored_items.contains_key(&triggering_item_id) { + return Err(StatusCode::BadMonitoredItemIdInvalid); + } + + let (to_add, add_results) = Self::filter_links(links_to_add, &sub.monitored_items); + let (to_remove, remove_results) = Self::filter_links(links_to_remove, &sub.monitored_items); + + let item = sub.monitored_items.get_mut(&triggering_item_id).unwrap(); + + item.set_triggering(&to_add, &to_remove); + + Ok((add_results, remove_results)) + } + + pub(super) fn delete_monitored_items( + &mut self, + subscription_id: u32, + items: &[u32], + ) -> Result, StatusCode> { + let Some(sub) = self.subscriptions.get_mut(&subscription_id) else { + return Err(StatusCode::BadSubscriptionIdInvalid); + }; + let mut results = Vec::with_capacity(items.len()); + for id in items { + let handle = MonitoredItemHandle { + subscription_id, + monitored_item_id: *id, + }; + if let Some(item) = sub.monitored_items.remove(&id) { + results.push(( + handle, + StatusCode::Good, + item.item_to_monitor().node_id.clone(), + )); + } else { + results.push(( + handle, + StatusCode::BadMonitoredItemIdInvalid, + NodeId::null(), + )) + } + } + Ok(results) + } + + pub(super) fn delete_subscriptions( + &mut self, + ids: &[u32], + ) -> Vec<(StatusCode, Vec<(MonitoredItemHandle, NodeId)>)> { + let id_set: HashSet<_> = ids.iter().copied().collect(); + let mut result = Vec::with_capacity(ids.len()); + for id in ids { + let Some(sub) = self.subscriptions.remove(id) else { + result.push((StatusCode::BadSubscriptionIdInvalid, Vec::new())); + continue; + }; + + let items = sub + .monitored_items + .into_iter() + .map(|item| { + ( + MonitoredItemHandle { + subscription_id: *id, + monitored_item_id: item.1.id(), + }, + item.1.item_to_monitor().node_id.clone(), + ) + }) + .collect(); + + result.push((StatusCode::Good, items)) + } + + self.retransmission_queue + .retain(|r| !id_set.contains(&r.subscription_id)); + + result + } + /// This function takes the requested values passed in a create / modify and returns revised /// values that conform to the server's limits. For simplicity the return type is a tuple fn revise_subscription_values( @@ -478,4 +715,10 @@ impl SessionSubscriptions { pub(super) fn user_token(&self) -> &PersistentSessionKey { &self.user_token } + + pub(super) fn get_monitored_item_count(&self, subscription_id: u32) -> Option { + self.subscriptions + .get(&subscription_id) + .map(|s| s.monitored_items.len()) + } } diff --git a/lib/src/async_server/subscriptions/subscription.rs b/lib/src/async_server/subscriptions/subscription.rs index 783156c4c..3999c7467 100644 --- a/lib/src/async_server/subscriptions/subscription.rs +++ b/lib/src/async_server/subscriptions/subscription.rs @@ -74,6 +74,7 @@ pub(crate) enum HandledState { /// This is for debugging purposes. It allows the caller to validate the output state if required. #[derive(Debug)] pub(crate) struct UpdateStateResult { + #[allow(unused)] pub handled_state: HandledState, pub update_state_action: UpdateStateAction, } @@ -122,14 +123,10 @@ pub struct Subscription { /// 1 and be sequential - it that doesn't happen the server will panic because something went /// wrong somewhere. last_sequence_number: u32, - // The last monitored item id - next_monitored_item_id: u32, // The time that the subscription interval last fired last_time_publishing_interval_elapsed: Instant, // Currently outstanding notifications to send notifications: VecDeque, - // Monitored item triggers that have not yet been evaluated. - pending_triggers: VecDeque, /// Maximum number of queued notifications. max_queued_notifications: usize, /// Maximum number of notifications per publish. @@ -170,10 +167,8 @@ impl Subscription { // Counters for new items sequence_number: Handle::new(1), last_sequence_number: 0, - next_monitored_item_id: 1, last_time_publishing_interval_elapsed: Instant::now(), notifications: VecDeque::new(), - pending_triggers: VecDeque::new(), max_queued_notifications, max_notifications_per_publish, } @@ -621,12 +616,16 @@ impl Subscription { fn handle_triggers( &mut self, now: &DateTimeUtc, + triggers: Vec<(u32, u32)>, notifications: &mut Vec, max_notifications: usize, messages: &mut Vec, ) { - while let Some(item_id) = self.pending_triggers.pop_front() { + for (triggering_item, item_id) in triggers { let Some(item) = self.monitored_items.get_mut(&item_id) else { + if let Some(item) = self.monitored_items.get_mut(&triggering_item) { + item.remove_dead_trigger(item_id); + } continue; }; @@ -675,12 +674,18 @@ impl Subscription { ) -> Vec { let mut notifications = Vec::new(); let mut messages = Vec::new(); + let mut triggers = Vec::new(); for monitored_item in self.monitored_items.values_mut() { if publishing_interval_elapsed { if monitored_item.is_sampling() && monitored_item.has_new_notifications() { - self.pending_triggers - .extend(monitored_item.triggered_items().iter().copied()); + triggers.extend( + monitored_item + .triggered_items() + .iter() + .copied() + .map(|id| (monitored_item.id(), id)), + ); } if monitored_item.is_reporting() { @@ -703,7 +708,13 @@ impl Subscription { } } - self.handle_triggers(now, &mut notifications, max_notifications, &mut messages); + self.handle_triggers( + now, + triggers, + &mut notifications, + max_notifications, + &mut messages, + ); if notifications.len() > 0 { messages.push(Self::make_notification_message(