Skip to content

Commit

Permalink
Scaffolding for translate browse paths
Browse files Browse the repository at this point in the history
This service is a nightmare with this design. Hopefully things will get
a lot simpler once we get away from browse.
  • Loading branch information
einarmo committed May 12, 2024
1 parent cfeb771 commit 38df674
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 36 deletions.
12 changes: 9 additions & 3 deletions lib/src/async_server/node_manager/memory/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{

use super::{
view::{AddReferenceResult, ExternalReference, ExternalReferenceRequest, NodeMetadata},
BrowseNode, NodeManager, ReadNode, RequestContext, TypeTree,
BrowseNode, BrowsePathItem, NodeManager, ReadNode, RequestContext, TypeTree,
};

use crate::async_server::address_space::AddressSpace;
Expand Down Expand Up @@ -195,8 +195,6 @@ impl<TImpl: InMemoryNodeManagerImpl> InMemoryNodeManager<TImpl> {
}

fn is_readable(context: &RequestContext, node: &NodeType, attribute_id: AttributeId) -> bool {
// TODO session for current user
// Check for access level, user access level
Self::user_access_level(context, node, attribute_id).contains(UserAccessLevel::CURRENT_READ)
}

Expand Down Expand Up @@ -414,4 +412,12 @@ impl<TImpl: InMemoryNodeManagerImpl> NodeManager for InMemoryNodeManager<TImpl>

Ok(())
}

async fn translate_browse_paths_to_node_ids(
&self,
context: &RequestContext,
nodes: &mut [&mut BrowsePathItem],
) -> Result<(), StatusCode> {
Ok(())
}
}
2 changes: 1 addition & 1 deletion lib/src/async_server/node_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ pub trait NodeManager {
async fn translate_browse_paths_to_node_ids(
&self,
context: &RequestContext,
nodes: &mut [BrowsePathItem],
nodes: &mut [&mut BrowsePathItem],
) -> Result<(), StatusCode> {
Err(StatusCode::BadServiceUnsupported)
}
Expand Down
117 changes: 90 additions & 27 deletions lib/src/async_server/node_manager/view.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::collections::{HashMap, VecDeque};
use std::{
collections::{HashMap, VecDeque},
num::NonZeroUsize,
};

use crate::{
async_server::session::{
Expand All @@ -10,7 +13,8 @@ use crate::{
prelude::{
random, BrowseDescription, BrowseDescriptionResultMask, BrowseDirection, BrowsePath,
BrowsePathTarget, BrowseResult, ByteString, ExpandedNodeId, LocalizedText, NodeClass,
NodeClassMask, NodeId, QualifiedName, ReferenceDescription, StatusCode,
NodeClassMask, NodeId, QualifiedName, ReferenceDescription, RelativePathElement,
StatusCode,
},
},
};
Expand Down Expand Up @@ -427,6 +431,8 @@ impl BrowseNode {
references: Some(self.references),
};

// If we're out of continuation points, the correct response is to not store it, and
// set the status code to BadNoContinuationPoints.
if let Some(c) = continuation_point {
if session.add_browse_continuation_point(c).is_err() {
result.status_code = StatusCode::BadNoContinuationPoints;
Expand Down Expand Up @@ -498,44 +504,101 @@ pub(crate) struct ExternalReferencesContPoint {
// If it becomes necessary there may be ways to handle this, but it may be we just leave it up
// to the user.

pub(crate) struct BrowsePathResultElement {
pub(crate) node: NodeId,
pub(crate) depth: usize,
}

/// Container for a node being discovered in a browse path operation.
pub struct BrowsePathItem {
path: BrowsePath,
status_code: StatusCode,
targets: Vec<BrowsePathTarget>,
pub struct BrowsePathItem<'a> {
pub(crate) node: NodeId,
input_index: usize,
depth: usize,
node_manager_index: usize,
iteration_number: usize,
path: &'a [RelativePathElement],
results: Vec<BrowsePathResultElement>,
status: StatusCode,
}

impl BrowsePathItem {
pub fn new(path: BrowsePath) -> Self {
impl<'a> BrowsePathItem<'a> {
pub(crate) fn new(
elem: BrowsePathResultElement,
input_index: usize,
root: &BrowsePathItem<'a>,
node_manager_index: usize,
iteration_number: usize,
) -> Self {
Self {
path,
status_code: StatusCode::BadNodeIdUnknown,
targets: Vec::new(),
node: elem.node,
input_index,
depth: elem.depth,
node_manager_index,
path: if elem.depth <= root.path.len() {
&root.path[(elem.depth - 1)..]
} else {
&[]
},
results: Vec::new(),
status: StatusCode::Good,
iteration_number,
}
}

/// Get the path that should be visited.
pub fn path(&self) -> &BrowsePath {
&self.path
pub(crate) fn new_root(path: &'a BrowsePath, input_index: usize) -> Self {
Self {
node: path.starting_node.clone(),
input_index,
depth: 0,
node_manager_index: usize::MAX,
path: if let Some(elements) = path.relative_path.elements.as_ref() {
&*elements
} else {
&[]
},
results: Vec::new(),
status: StatusCode::Good,
iteration_number: 0,
}
}

/// Set the status code for this item. This defaults to BadNodeIdUnknown.
/// If you are an owner of the start node, you should make sure to set this.
pub fn set_status_code(&mut self, status_code: StatusCode) {
self.status_code = status_code;
pub fn path(&self) -> &'a [RelativePathElement] {
self.path
}

/// Add a path target to the result.
pub fn add_target(&mut self, node_id: ExpandedNodeId, remaining_path_index: u32) {
self.targets.push(BrowsePathTarget {
target_id: node_id,
remaining_path_index,
});
pub fn node_id(&self) -> &NodeId {
&self.node
}

pub fn add_element(&mut self, node: NodeId, relative_depth: usize) {
self.results.push(BrowsePathResultElement {
node,
depth: self.depth + relative_depth,
})
}

pub fn set_status(&mut self, status: StatusCode) {
self.status = status;
}

pub(crate) fn results_mut(&mut self) -> &mut Vec<BrowsePathResultElement> {
&mut self.results
}

pub(crate) fn input_index(&self) -> usize {
self.input_index
}

pub(crate) fn node_manager_index(&self) -> usize {
self.node_manager_index
}

pub fn status(&self) -> StatusCode {
self.status
}

/// Get the registered targets. You are allowed to use these to continue fetching nodes.
pub fn targets(&self) -> &[BrowsePathTarget] {
&self.targets
pub fn iteration_number(&self) -> usize {
self.iteration_number
}
}

Expand Down
160 changes: 155 additions & 5 deletions lib/src/async_server/session/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ use crate::{
authenticator::UserToken,
info::ServerInfo,
node_manager::{
resolve_external_references, BrowseNode, ExternalReferencesContPoint, NodeManager,
ReadNode, RequestContext,
resolve_external_references, BrowseNode, BrowsePathItem, ExternalReferencesContPoint,
NodeManager, ReadNode, RequestContext,
},
},
server::prelude::{
BrowseNextRequest, BrowseNextResponse, BrowseRequest, BrowseResponse, BrowseResult,
ByteString, ReadRequest, ReadResponse, ResponseHeader, ServiceFault, StatusCode,
SupportedMessage, TimestampsToReturn,
BrowseNextRequest, BrowseNextResponse, BrowsePathResult, BrowsePathTarget, BrowseRequest,
BrowseResponse, BrowseResult, ByteString, ReadRequest, ReadResponse, ResponseHeader,
ServiceFault, StatusCode, SupportedMessage, TimestampsToReturn,
TranslateBrowsePathsToNodeIdsRequest, TranslateBrowsePathsToNodeIdsResponse,
},
};

Expand Down Expand Up @@ -139,6 +140,20 @@ impl MessageHandler {
)))
}

SupportedMessage::TranslateBrowsePathsToNodeIdsRequest(request) => {
HandleMessageResult::AsyncMessage(tokio::task::spawn(Self::translate_browse_paths(
self.node_managers.clone(),
Request::new(
request,
self.info.clone(),
request_id,
request_handle,
session,
token,
),
)))
}

message => {
debug!(
"Message handler does not handle this kind of message {:?}",
Expand Down Expand Up @@ -551,4 +566,139 @@ impl MessageHandler {
request_id: request.request_id,
}
}

async fn translate_browse_paths(
node_managers: NodeManagers,
mut request: Request<TranslateBrowsePathsToNodeIdsRequest>,
) -> Response {
// - We're given a list of (NodeId, BrowsePath) pairs
// - For a node manager, ask them to explore the browse path, returning _all_ visited nodes in each layer.
// - This extends the list of (NodeId, BrowsePath) pairs, though each new node should have a shorter browse path.
// - We keep which node managers returned which nodes. Once every node manager has been asked about every
// returned node, the service is finished and we can collect all the node IDs in the bottom layer.

let Some(paths) = std::mem::take(&mut request.request.browse_paths) else {
return request.service_fault(StatusCode::BadNothingToDo);
};

if paths.is_empty() {
return request.service_fault(StatusCode::BadNothingToDo);
}

if paths.len()
> request
.info
.operational_limits
.max_nodes_per_translate_browse_paths_to_node_ids
{
return request.service_fault(StatusCode::BadTooManyOperations);
}

let mut items: Vec<_> = paths
.iter()
.enumerate()
.map(|(i, p)| BrowsePathItem::new_root(p, i))
.collect();

let context = request.context();
let mut idx = 0;
let mut iteration = 1;
let mut any_new_items_in_iteration = false;
let mut final_results = Vec::new();
loop {
let mgr = &node_managers[idx];
let mut chunk: Vec<_> = items
.iter_mut()
.filter(|it| {
// Item has not yet been marked bad, meaning it failed to resolve somewhere it should.
it.status().is_good()
// Either it's from a previous node manager,
&& (it.node_manager_index() < idx
// Or it's not from a later node manager in the previous iteration.
|| it.node_manager_index() > idx
&& it.iteration_number() == iteration - 1)
})
.collect();

if !chunk.is_empty() {
// Call translate on any of the target IDs.
if let Err(e) = mgr
.translate_browse_paths_to_node_ids(&context, &mut chunk)
.await
{
for n in &mut chunk {
if mgr.owns_node(n.node_id()) {
n.set_status(e);
}
}
} else {
let mut next = Vec::new();
for n in &mut chunk {
let index = n.input_index();
for el in n.results_mut().drain(..) {
next.push((el, index));
}
}

for (n, input_index) in next {
let item = BrowsePathItem::new(
n,
input_index,
&items[input_index],
idx,
iteration,
);
if item.path().is_empty() {
final_results.push(item);
} else {
any_new_items_in_iteration = true;
items.push(item);
}
}
}
}

idx += 1;
if idx == node_managers.len() {
idx = 0;
iteration += 1;
if !any_new_items_in_iteration {
break;
}
}

idx = (idx + 1) % node_managers.len();
}
// Collect all final paths.
let mut results: Vec<_> = items
.iter()
.take(paths.len())
.map(|p| BrowsePathResult {
status_code: p.status(),
targets: Some(Vec::new()),
})
.collect();

for res in final_results {
results[res.input_index()]
.targets
.as_mut()
.unwrap()
.push(BrowsePathTarget {
target_id: res.node.into(),
// External server references are not yet supported.
remaining_path_index: u32::MAX,
});
}

Response {
message: TranslateBrowsePathsToNodeIdsResponse {
response_header: ResponseHeader::new_good(request.request_handle),
results: Some(results),
diagnostic_infos: None,
}
.into(),
request_id: request.request_id,
}
}
}

0 comments on commit 38df674

Please sign in to comment.