Skip to content

Commit

Permalink
Add extra debug to tasks ending. Fix hostname resolution. Fix 5 minut…
Browse files Browse the repository at this point in the history
…e timer on server registration causing abort to timeout. Fix issue with server complaining on secure channel renew complaining when None security channel renews itself with a null nonce.
  • Loading branch information
locka99 committed Aug 30, 2018
1 parent 4e2b0f2 commit 0914a35
Show file tree
Hide file tree
Showing 20 changed files with 173 additions and 78 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,15 @@ impl Client {
if let Ok(_) = connected {
// Find me some some servers
let servers = session.find_servers(discovery_endpoint_url.clone());
if let Ok(servers) = servers {
let result = if let Ok(servers) = servers {
Ok(servers)
} else {
let result = servers.unwrap_err();
error!("Cannot find servers on discovery server {} - check this error - {:?}", discovery_endpoint_url, result);
Err(result)
}
};
let _ = session.disconnect();
result
} else {
let result = connected.unwrap_err();
error!("Cannot connect to {} - check this error - {:?}", discovery_endpoint_url, result);
Expand Down Expand Up @@ -260,7 +262,9 @@ impl Client {
let connected = session.connect();
if let Ok(_) = connected {
// Register with the server
session.register_server(server)
let result = session.register_server(server);
let _ = session.disconnect();
result
} else {
let result = connected.unwrap_err();
error!("Cannot connect to {} - check this error - {:?}", discovery_endpoint_url, result);
Expand Down
18 changes: 14 additions & 4 deletions client/src/comms/tcp_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::thread;
use std::time;
use std::result::Result;
use std::sync::{Arc, RwLock, Mutex};
use std::net::SocketAddr;
use std::net::{SocketAddr, ToSocketAddrs};

use tokio;
use tokio::net::TcpStream;
Expand Down Expand Up @@ -175,12 +175,22 @@ impl TcpTransport {
let host = url.host_str().unwrap();
let port = if let Some(port) = url.port() { port } else { 4840 };

// Resolve the host name into a socket address
let addr = {
let addr = format!("{}:{}", host, port).parse::<SocketAddr>();
if addr.is_err() {
let addr = format!("{}:{}", host, port);
let addrs = addr.to_socket_addrs();
if let Ok(mut addrs) = addrs {
// Take the first resolved ip addr for the hostname
if let Some(addr) = addrs.next() {
addr
} else {
error!("Invalid address {}, does not resolve to any socket", addr);
return Err(BadTcpEndpointUrlInvalid);
}
} else {
error!("Invalid address {}, cannot be parsed {:?}", addr, addrs.unwrap_err());
return Err(BadTcpEndpointUrlInvalid);
}
addr.unwrap()
};
assert_eq!(addr.port(), port);
assert!(addr.is_ipv4());
Expand Down
21 changes: 10 additions & 11 deletions client/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,9 @@ pub struct Session {

impl Drop for Session {
fn drop(&mut self) {

// This panics in local discovery server call from server registration
// if self.is_connected() {
// self.disconnect();
// }
if self.is_connected() {
self.disconnect();
}
}
}

Expand Down Expand Up @@ -224,27 +222,28 @@ impl Session {
}
}

/// Disconnect from the server. Disconnect
/// Disconnect from the server. Disconnect is an explicit command to drop the socket and throw
/// away all state information. If you disconnect you cannot reconnect later.
pub fn disconnect(&mut self) {
let _ = self.delete_all_subscriptions();
let _ = self.close_secure_channel();
self.transport.wait_for_disconnect();
}

/// Test if the session is in a connected state
pub fn is_connected(&self) -> bool {
self.transport.is_connected()
}

/// Sends an OpenSecureChannel request to the server
pub fn open_secure_channel(&mut self) -> Result<(), StatusCode> {
debug!("open_secure_channel");
{
let mut session_state = trace_write_lock_unwrap!(self.session_state);
session_state.issue_or_renew_secure_channel(SecurityTokenRequestType::Issue)
}
let mut session_state = trace_write_lock_unwrap!(self.session_state);
session_state.issue_or_renew_secure_channel(SecurityTokenRequestType::Issue)
}

/// Sends a CloseSecureChannel request to the server
/// Sends a CloseSecureChannel request to the server which will cause the server to drop
/// the connection.
pub fn close_secure_channel(&mut self) -> Result<(), StatusCode> {
let request = CloseSecureChannelRequest {
request_header: self.make_request_header(),
Expand Down
2 changes: 1 addition & 1 deletion client/src/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ impl SessionState {

let (security_mode, security_policy, client_nonce) = {
let mut secure_channel = trace_write_lock_unwrap!(self.secure_channel);
let client_nonce = secure_channel.security_policy().nonce();
let client_nonce = secure_channel.security_policy().random_nonce();
secure_channel.set_local_nonce(client_nonce.as_ref());
(secure_channel.security_mode(), secure_channel.security_policy(), client_nonce)
};
Expand Down
2 changes: 0 additions & 2 deletions core/src/comms/secure_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,9 @@ impl SecureChannel {
/// to be renewed if the issue period has elapsed by 75% or more.
pub fn should_renew_security_token(&self) -> bool {
if self.token_id() == 0 {
// panic!("Shouldn't be asking this question, if there is no token id at all");
false
} else {
let now = chrono::Utc::now();

// Check if secure channel 75% close to expiration in which case send a renew
let renew_lifetime = (self.token_lifetime() * 3) / 4;
let created_at = self.token_created_at().into();
Expand Down
2 changes: 1 addition & 1 deletion core/src/crypto/security_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ impl SecurityPolicy {
}

/// Creates a random nonce in a bytestring with a length appropriate for the policy
pub fn nonce(&self) -> ByteString {
pub fn random_nonce(&self) -> ByteString {
match *self {
SecurityPolicy::None => ByteString::null(),
SecurityPolicy::Basic128Rsa15 |
Expand Down
8 changes: 4 additions & 4 deletions core/src/tests/crypto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,10 +352,10 @@ fn sign_hmac_sha256() {
#[test]
fn generate_nonce() {
// Generate a random nonce through the function and ensure it is the expected length
assert!(SecurityPolicy::None.nonce().is_null());
assert_eq!(SecurityPolicy::Basic128Rsa15.nonce().as_ref().len(), 16);
assert_eq!(SecurityPolicy::Basic256.nonce().as_ref().len(), 32);
assert_eq!(SecurityPolicy::Basic256Sha256.nonce().as_ref().len(), 32);
assert!(SecurityPolicy::None.random_nonce().is_null());
assert_eq!(SecurityPolicy::Basic128Rsa15.random_nonce().as_ref().len(), 16);
assert_eq!(SecurityPolicy::Basic256.random_nonce().as_ref().len(), 32);
assert_eq!(SecurityPolicy::Basic256Sha256.random_nonce().as_ref().len(), 32);
}

#[test]
Expand Down
4 changes: 4 additions & 0 deletions integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ version = "0.4.0" # OPCUARustVersion
[dependencies.opcua-client]
path = "../client"
version = "0.4.0" # OPCUARustVersion

[dependencies.opcua-console-logging]
path = "../console-logging"
version = "0.4.0" # OPCUARustVersion
1 change: 1 addition & 0 deletions integration/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ extern crate opcua_client;
extern crate opcua_core;
extern crate opcua_server;
extern crate opcua_types;
extern crate opcua_console_logging;

fn main() {
eprintln!(r#"Needs to be run with "cargo test --features integration -- --test-threads=1""#);
Expand Down
4 changes: 4 additions & 0 deletions integration/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use opcua_server::config::{ServerEndpoint, ServerConfig};
use opcua_server::prelude::*;
use opcua_client::prelude::*;
use opcua_client::config::{ClientConfig, ClientUserToken};
use opcua_console_logging;

const ENDPOINT_ID_NONE: &'static str = "sample_none";
const ENDPOINT_ID_BASIC128RSA15_SIGN_ENCRYPT: &'static str = "sample_basic128rsa15_signencrypt";
Expand Down Expand Up @@ -242,6 +243,9 @@ enum ServerResponse {
fn perform_test<CT, ST>(port_offset: u16, client_test: Option<CT>, server_test: ST)
where CT: FnOnce(mpsc::Receiver<ClientCommand>, Client) + Send + 'static,
ST: FnOnce(mpsc::Receiver<ServerCommand>, Server) + Send + 'static {

opcua_console_logging::init();

let (client, server) = new_client_server(port_offset);

// Spawn the CLIENT thread
Expand Down
2 changes: 1 addition & 1 deletion samples/demo-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ fn add_control_switches(server: &mut Server) {
// Check if abort has been set to true, in which case abort
if abort {
let mut server_state = server_state.write().unwrap();
server_state.abort = true;
server_state.abort();
}
});
}
Expand Down
6 changes: 4 additions & 2 deletions server/src/comms/secure_channel_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,10 @@ impl SecureChannelService {
trace!("Request type == Renew");

// Check for a duplicate nonce. It is invalid for the renew to use the same nonce
// as was used for last issue/renew
if request.client_nonce.as_ref() == &secure_channel.remote_nonce()[..] {
// as was used for last issue/renew. It doesn't matter when policy is none.
if self.secure_channel.security_policy() != SecurityPolicy::None &&
request.client_nonce.as_ref() == &secure_channel.remote_nonce()[..] {
error!("Client reused a nonce for a renew");
return Ok(ServiceFault::new_supported_message(&request.request_header, BadNonceInvalid));
}

Expand Down
2 changes: 1 addition & 1 deletion server/src/comms/tcp_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ impl TcpTransport {
/// Test if the connection should abort
pub fn is_server_abort(&self) -> bool {
let server_state = trace_read_lock_unwrap!(self.server_state);
server_state.abort
server_state.is_abort()
}

fn process_hello(&mut self, hello: HelloMessage, sender: &mut UnboundedSender<(UInt32, SupportedMessage)>) -> std::result::Result<(), StatusCode> {
Expand Down
1 change: 1 addition & 0 deletions server/src/completion_pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ impl<S, C> Stream for CompletionPact<S, C>
Err(_) |
Ok(Async::Ready(Some(_))) => {
// We are done, forget us
debug!("Completer has triggered, indicating completion of the job");
Ok(Async::Ready(None))
}
Ok(Async::NotReady) => {
Expand Down
37 changes: 22 additions & 15 deletions server/src/discovery/mod.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,39 @@
use opcua_types::service_types::RegisteredServer;

use opcua_client::client::Client;
use opcua_client::config::ClientConfig;

use state::ServerState;

/// Registers the specified endpoints with the specified discovery server
pub fn register_discover_server(discovery_server_url: &str, server_state: &ServerState) {
// This follows the local discovery process described in part 12 of the spec, calling
// find_servers on it

trace!("Discovery server registration stub is triggering for {}", discovery_server_url);
pub fn register_with_discovery_server(discovery_server_url: &str, server_state: &ServerState) {
debug!("register_with_discovery_server, for {}", discovery_server_url);
let server_config = trace_read_lock_unwrap!(server_state.config);

// Client's pki dir must match server's
let mut config = ClientConfig::new("DiscoveryClient", "urn:DiscoveryClient");
config.pki_dir = server_config.pki_dir.clone();
let mut client = Client::new(config);

let servers = client.find_servers(discovery_server_url);
if let Ok(servers) = servers {
debug!("Servers on the discovery endpoint - {:?}", servers);
// This follows the local discovery process described in part 12 of the spec, calling
// find_servers on it first.

let registered_server: RegisteredServer = server_state.registered_server();
let result = client.register_server(discovery_server_url, registered_server);
if result.is_err() {
error!("Cannot register server with discovery server {}. Check for errors to discover the reason why.", discovery_server_url);
error!("One solution you may try is to ensure your server's cert is trusted by the discovery server.");
// Connect to the server and call find_servers to ensure it is a discovery server
match client.find_servers(discovery_server_url) {
Ok(servers) => {
debug!("Servers on the discovery endpoint - {:?}", servers);
// Register the server
let registered_server = server_state.registered_server();
match client.register_server(discovery_server_url, registered_server) {
Ok(_) => {}
Err(err) => {
error!("Cannot register server with discovery server {}. Check for error {:?} to discover the reason why.", discovery_server_url, err);
error!("One solution you may try is to ensure your server's cert is trusted by the discovery server.");
}
}
}
Err(err) => {
error!("Cannot find servers on discovery url {}, error = {:?}", discovery_server_url, err);
}
}

debug!("register_with_discovery_server, finished");
}
Loading

0 comments on commit 0914a35

Please sign in to comment.