Skip to content

Commit

Permalink
Reader check seqence_number
Browse files Browse the repository at this point in the history
  • Loading branch information
schroeder- committed Aug 9, 2021
1 parent 4ef144c commit 212ffc5
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 7 deletions.
6 changes: 2 additions & 4 deletions src/message/uadp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use opcua_types::{process_decode_io_result, read_u16, read_u32, read_u64, read_u
use opcua_types::{process_encode_io_result, write_u16, write_u32, write_u8};
use std::convert::TryFrom;
use std::io::{Read, Write};

use crate::until::is_sequence_newer;
/// Uadp Message flags See OPC Unified Architecture, Part 14 7.2.2.2.2
#[derive(PartialEq, Debug, Clone, Copy)]
pub(super) struct MessageHeaderFlags(u32);
Expand Down Expand Up @@ -1556,10 +1558,6 @@ pub struct UadpMessageChunkManager {
dataset_id: u16,
}

fn is_sequence_newer(sequence_no: u16, last_sequence_no: u16) -> bool {
let v = (65535_u32 + u32::from(sequence_no) - u32::from(last_sequence_no)) % 65536;
v < 16384
}

impl UadpMessageChunkManager {
pub const fn new(dataset_id: u16) -> Self {
Expand Down
21 changes: 18 additions & 3 deletions src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use crate::message::UadpMessageChunkManager;
use crate::message::UadpPayload;
use crate::network::ReaderTransportSettings;
use crate::prelude::PubSubDataSource;
use crate::until::decode_extension;
use crate::until::{decode_extension, is_sequence_newer};
use std::sync::{Arc, Mutex, RwLock};

use log::trace;
use log::{debug, trace};
use log::{error, warn};
use opcua_types::BrokerDataSetReaderTransportDataType;
use opcua_types::ConfigurationVersionDataType;
Expand Down Expand Up @@ -46,6 +46,7 @@ pub struct DataSetReaderBuilder {
pub struct DataSetReader {
name: UAString,
publisher_id: Variant,
last_seqence: Option<u16>,
writer_group_id: u16,
dataset_writer_id: u16,
fields: Vec<PubSubFieldMetaData>,
Expand Down Expand Up @@ -117,6 +118,7 @@ impl DataSetReaderBuilder {
sub_data_set: SubscribedDataSet::new(),
transport_settings: self.transport_settings.clone(),
dechunker: UadpMessageChunkManager::new(self.dataset_writer_id),
last_seqence: None
}
}
}
Expand Down Expand Up @@ -226,7 +228,7 @@ impl DataSetReader {
// Find out if the writer_groupe is contained in the message
if let Some(gp) = &msg.group_header {
if let Some(wg_id) = gp.writer_group_id {
if wg_id == self.writer_group_id {
if wg_id == self.writer_group_id{
// Find the dataset if contained
if let Some(idx) = msg
.dataset_payload
Expand All @@ -251,6 +253,7 @@ impl DataSetReader {
sub_data_set: SubscribedDataSet::new(),
transport_settings: ReaderTransportSettings::None,
dechunker: UadpMessageChunkManager::new(cfg.data_set_writer_id),
last_seqence: None
};
s.update(cfg)?;
Ok(s)
Expand Down Expand Up @@ -474,6 +477,18 @@ impl DataSetReader {
self.parse_msg(&msg, idx, data_source, cb);
}
} else {
// Check if sequence_no exists and is newer
if let Some(gp) = &msg.group_header{
if let Some(seq) = gp.sequence_no{
if let Some(last) = self.last_seqence{
if !is_sequence_newer(seq, last){
debug!("rejected msg because of old seqence_no");
return;
}
}
self.last_seqence = Some(seq);
}
}
self.parse_msg(msg, idx, data_source, cb);
};
}
Expand Down
6 changes: 6 additions & 0 deletions src/until.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,9 @@ pub fn decode_extension<T: BinaryEncoder<T>>(
Err(StatusCode::BadDecodingError)
}
}


pub (crate) fn is_sequence_newer(sequence_no: u16, last_sequence_no: u16) -> bool {
let v = (65535_u32 + u32::from(sequence_no) - u32::from(last_sequence_no)) % 65536;
v < 16384
}

0 comments on commit 212ffc5

Please sign in to comment.