Skip to content

Commit

Permalink
All subscription services
Browse files Browse the repository at this point in the history
  • Loading branch information
einarmo committed Jun 7, 2024
1 parent 5ea7e91 commit 6a93ec8
Show file tree
Hide file tree
Showing 12 changed files with 953 additions and 42 deletions.
2 changes: 2 additions & 0 deletions lib/src/async_server/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ pub struct ServerInfo {
pub type_tree: Arc<RwLock<TypeTree>>,
/// Generator for subscription IDs.
pub subscription_id_handle: AtomicHandle,
/// Generator for monitored item IDs.
pub monitored_item_id_handle: AtomicHandle,
}

impl ServerInfo {
Expand Down
2 changes: 1 addition & 1 deletion lib/src/async_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ mod transport;

pub use config::{ServerConfig, ServerEndpoint, ServerUserToken};
pub use server_core::ServerCore;
pub use subscriptions::SubscriptionCache;
pub use subscriptions::{MonitoredItemHandle, SubscriptionCache};

pub mod constants {
//! Provides constants that govern the internal workings of the server implementation.
Expand Down
3 changes: 3 additions & 0 deletions lib/src/async_server/node_manager/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
async_server::{
authenticator::{AuthManager, UserToken},
session::{instance::Session, message_handler::NodeManagers},
SubscriptionCache,
},
server::prelude::{BrowseDescriptionResultMask, NodeId},
sync::RwLock,
Expand All @@ -16,10 +17,12 @@ use super::{

pub struct RequestContext {
pub session: Arc<RwLock<Session>>,
pub session_id: u32,
pub authenticator: Arc<dyn AuthManager>,
pub token: UserToken,
pub current_node_manager_index: usize,
pub type_tree: Arc<RwLock<TypeTree>>,
pub subscriptions: Arc<SubscriptionCache>,
}

impl RequestContext {}
Expand Down
59 changes: 55 additions & 4 deletions lib/src/async_server/node_manager/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use async_trait::async_trait;

use crate::server::prelude::{
DeleteAtTimeDetails, DeleteEventDetails, DeleteRawModifiedDetails, NodeId, ReadAtTimeDetails,
ReadEventDetails, ReadProcessedDetails, ReadRawModifiedDetails, StatusCode, TimestampsToReturn,
UpdateDataDetails, UpdateEventDetails, UpdateStructureDataDetails, WriteValue,
DeleteAtTimeDetails, DeleteEventDetails, DeleteRawModifiedDetails, ModifyMonitoredItemsRequest,
MonitoredItemModifyRequest, MonitoredItemModifyResult, MonitoringMode, NodeId,
ReadAtTimeDetails, ReadEventDetails, ReadProcessedDetails, ReadRawModifiedDetails, StatusCode,
TimestampsToReturn, UpdateDataDetails, UpdateEventDetails, UpdateStructureDataDetails,
WriteValue,
};

mod context;
Expand All @@ -15,7 +17,7 @@ mod view;

use self::view::ExternalReferenceRequest;

use super::subscriptions::CreateMonitoredItem;
use super::{subscriptions::CreateMonitoredItem, MonitoredItemHandle};

pub use {
context::RequestContext,
Expand Down Expand Up @@ -256,8 +258,57 @@ pub trait NodeManager {
/// is not Disabled.
async fn create_monitored_items(
&self,
context: &RequestContext,
items: &[&mut CreateMonitoredItem],
) -> Result<(), StatusCode> {
Err(StatusCode::BadServiceUnsupported)
}

/// Modify monitored items. This method is purely informative for the node manager,
/// to let it modify sampling intervals, apply a new filter, or similar.
///
/// Node managers are not required to take any action here, and this method is not
/// allowed to fail.
async fn modify_monitored_items(
&self,
context: &RequestContext,
subscription_id: u32,
items: &[&MonitoredItemModifyResult],
) {
}

/// Modify monitored items. This method is purely informative for the node manager,
/// to let it pause or resume sampling. Note that this should _not_ delete context
/// stored from `create_monitored_items`, since it may be called again to resume sampling.
///
/// The node manager should sample so long as monitoring mode is not `Disabled`, the difference
/// between `Reporting` and `Sampling` is handled by the server.
///
/// Node managers are not required to take any action here, and this method is not
/// allowed to fail.
async fn set_monitoring_mode(
&self,
context: &RequestContext,
mode: MonitoringMode,
subscription_id: u32,
items: &[MonitoredItemHandle],
) {
}

/// Delete monitored items. This method is purely informative for the node manager,
/// to let it stop sampling, or similar.
///
/// Node managers are not required to take any action here, and this method is not
/// allowed to fail. Most node managers that implement subscriptions will want to do
/// something with this.
///
/// This method may be given monitored items that were never created, or were
/// created for a different node manager. Attempting to delete a monitored item
/// that does not exist is handled elsewhere and should be a no-op here.
async fn delete_monitored_items(
&self,
context: &RequestContext,
items: &[MonitoredItemHandle],
) {
}
}
1 change: 1 addition & 0 deletions lib/src/async_server/server_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ impl ServerCore {
receive_buffer_size,
type_tree: Arc::new(RwLock::new(TypeTree::new())),
subscription_id_handle: AtomicHandle::new(1),
monitored_item_id_handle: AtomicHandle::new(1),
};

let certificate_store = Arc::new(RwLock::new(certificate_store));
Expand Down
4 changes: 3 additions & 1 deletion lib/src/async_server/session/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,9 @@ impl SessionController {

SupportedMessage::CloseSessionRequest(request) => {
let mut mgr = trace_write_lock!(self.session_manager);
let res = mgr.close_session(&mut self.channel, &request);
let res = mgr
.close_session(&mut self.channel, &mut self.message_handler, &request)
.await;
drop(mgr);
self.process_service_result(res, request.request_header.request_handle, id)
}
Expand Down
15 changes: 11 additions & 4 deletions lib/src/async_server/session/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
},
};

use super::instance::Session;
use super::{instance::Session, message_handler::MessageHandler};

lazy_static! {
static ref NEXT_SESSION_ID: AtomicU32 = AtomicU32::new(1);
Expand Down Expand Up @@ -272,26 +272,33 @@ impl SessionManager {
}
}

pub fn close_session(
pub async fn close_session(
&mut self,
channel: &mut SecureChannel,
handler: &mut MessageHandler,
request: &CloseSessionRequest,
) -> Result<CloseSessionResponse, StatusCode> {
let Some(session) = self.find_by_token(&request.request_header.authentication_token) else {
return Err(StatusCode::BadSessionIdInvalid);
};

let session = trace_read_lock!(session);
let id = session.session_id_numeric();
let token = session.user_token().cloned();

let secure_channel_id = channel.secure_channel_id();
if !session.is_activated() && session.secure_channel_id() != secure_channel_id {
error!("close_session rejected, secure channel id {} for inactive session does not match one used to create session, {}", secure_channel_id, session.secure_channel_id());
return Err(StatusCode::BadSecureChannelIdInvalid);
}

let session_id = session.session_id().clone();

self.sessions.remove(&session_id);
let session = self.sessions.remove(&session_id).unwrap();
if request.delete_subscriptions {
handler
.delete_session_subscriptions(id, session, token.unwrap())
.await;
}

Ok(CloseSessionResponse {
response_header: ResponseHeader::new_good(&request.request_header),
Expand Down
Loading

0 comments on commit 6a93ec8

Please sign in to comment.