Skip to content

Commit

Permalink
Merge with master
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Dec 1, 2022
2 parents 0151b80 + 0fcd462 commit 6e20f88
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 78 deletions.
4 changes: 2 additions & 2 deletions commons/zenoh-protocol-core/src/whatami.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,12 @@ impl BitOr for WhatAmIMatcher {
impl BitOr for WhatAmI {
type Output = WhatAmIMatcher;
fn bitor(self, rhs: Self) -> Self::Output {
WhatAmIMatcher(unsafe { NonZeroU8::new_unchecked(self as u8 | rhs as u8) })
WhatAmIMatcher(unsafe { NonZeroU8::new_unchecked(self as u8 | rhs as u8 | 128) })
}
}

impl From<WhatAmI> for WhatAmIMatcher {
fn from(w: WhatAmI) -> Self {
WhatAmIMatcher(unsafe { NonZeroU8::new_unchecked(w as u8) })
WhatAmIMatcher(unsafe { NonZeroU8::new_unchecked(w as u8 | 128) })
}
}
12 changes: 8 additions & 4 deletions zenoh-ext/src/publication_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl<'a> AsyncResolve for PublicationCacheBuilder<'a, '_, '_> {
}

pub struct PublicationCache<'a> {
_local_sub: FlumeSubscriber<'a>,
local_sub: FlumeSubscriber<'a>,
_queryable: Queryable<'a, flume::Receiver<Query>>,
_stoptx: Sender<bool>,
}
Expand Down Expand Up @@ -222,7 +222,7 @@ impl<'a> PublicationCache<'a> {
});

Ok(PublicationCache {
_local_sub: local_sub,
local_sub,
_queryable: queryable,
_stoptx: stoptx,
})
Expand All @@ -234,13 +234,17 @@ impl<'a> PublicationCache<'a> {
ResolveFuture::new(async move {
let PublicationCache {
_queryable,
_local_sub,
local_sub,
_stoptx,
} = self;
_queryable.undeclare().res_async().await?;
_local_sub.undeclare().res_async().await?;
local_sub.undeclare().res_async().await?;
drop(_stoptx);
Ok(())
})
}

pub fn key_expr(&self) -> &KeyExpr<'static> {
self.local_sub.key_expr()
}
}
16 changes: 12 additions & 4 deletions zenoh-ext/src/querying_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ pub struct QueryingSubscriber<'a, Receiver> {
query_target: QueryTarget,
query_consolidation: QueryConsolidation,
query_timeout: Duration,
_subscriber: Subscriber<'a, ()>,
subscriber: Subscriber<'a, ()>,
callback: Arc<dyn Fn(Sample) + Send + Sync + 'static>,
state: Arc<Mutex<InnerState>>,
receiver: Receiver,
Expand Down Expand Up @@ -397,7 +397,7 @@ impl<'a, Receiver> QueryingSubscriber<'a, Receiver> {
query_target: conf.query_target,
query_consolidation: conf.query_consolidation,
query_timeout: conf.query_timeout,
_subscriber: subscriber,
subscriber,
callback,
state,
receiver,
Expand All @@ -412,7 +412,15 @@ impl<'a, Receiver> QueryingSubscriber<'a, Receiver> {
/// Close this QueryingSubscriber
#[inline]
pub fn close(self) -> impl Resolve<ZResult<()>> + 'a {
self._subscriber.undeclare()
self.subscriber.undeclare()
}

pub fn key_expr(&self) -> &KeyExpr<'static> {
self.subscriber.key_expr()
}

pub fn query_key_expr(&self) -> &KeyExpr<'_> {
&self.query_key_expr
}

/// Issue a new query using the configured selector.
Expand Down Expand Up @@ -460,7 +468,7 @@ impl<'a, Receiver> QueryingSubscriber<'a, Receiver> {
};

// if selector for query is different than subscription keyexpr: accept Any reply
let query_accept_replies = if selector.key_expr != *self._subscriber.key_expr() {
let query_accept_replies = if selector.key_expr != *self.subscriber.key_expr() {
ReplyKeyExpr::Any
} else {
ReplyKeyExpr::MatchingQuery
Expand Down
107 changes: 73 additions & 34 deletions zenoh/src/publication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ impl PutBuilder<'_, '_> {
self
}

/// Restrict the matching subscribers that will receive the published data
/// to the ones that have the given [`Locality`](crate::prelude::Locality).
#[zenoh_core::unstable]
#[inline]
pub fn allowed_destination(mut self, destination: Locality) -> Self {
self.publisher = self.publisher.allowed_destination(destination);
self
}

pub fn kind(mut self, kind: SampleKind) -> Self {
self.kind = kind;
self
Expand Down Expand Up @@ -135,23 +144,27 @@ impl SyncResolve for PutBuilder<'_, '_> {
info.timestamp = publisher.session.runtime.new_timestamp();
let data_info = if info.has_options() { Some(info) } else { None };

primitives.send_data(
&key_expr.to_wire(&publisher.session),
value.payload.clone(),
Channel {
priority: publisher.priority.into(),
reliability: Reliability::Reliable, // @TODO: need to check subscriptions to determine the right reliability value
},
publisher.congestion_control,
data_info.clone(),
None,
);
publisher.session.handle_data(
true,
&key_expr.to_wire(&publisher.session),
data_info,
value.payload,
);
if publisher.destination != Locality::SessionLocal {
primitives.send_data(
&key_expr.to_wire(&publisher.session),
value.payload.clone(),
Channel {
priority: publisher.priority.into(),
reliability: Reliability::Reliable, // @TODO: need to check subscriptions to determine the right reliability value
},
publisher.congestion_control,
data_info.clone(),
None,
);
}
if publisher.destination != Locality::Remote {
publisher.session.handle_data(
true,
&key_expr.to_wire(&publisher.session),
data_info,
value.payload,
);
}
Ok(())
}
}
Expand Down Expand Up @@ -206,6 +219,7 @@ pub struct Publisher<'a> {
pub(crate) key_expr: KeyExpr<'a>,
pub(crate) congestion_control: CongestionControl,
pub(crate) priority: Priority,
pub(crate) destination: Locality,
}

impl<'a> Publisher<'a> {
Expand All @@ -227,6 +241,15 @@ impl<'a> Publisher<'a> {
self
}

/// Restrict the matching subscribers that will receive the published data
/// to the ones that have the given [`Locality`](crate::prelude::Locality).
#[zenoh_core::unstable]
#[inline]
pub fn allowed_destination(mut self, destination: Locality) -> Self {
self.destination = destination;
self
}

fn _write(&self, kind: SampleKind, value: Value) -> Publication {
Publication {
publisher: self,
Expand Down Expand Up @@ -401,23 +424,27 @@ impl SyncResolve for Publication<'_> {
info.timestamp = publisher.session.runtime.new_timestamp();
let data_info = if info.has_options() { Some(info) } else { None };

primitives.send_data(
&publisher.key_expr.to_wire(&publisher.session),
value.payload.clone(),
Channel {
priority: publisher.priority.into(),
reliability: Reliability::Reliable, // @TODO: need to check subscriptions to determine the right reliability value
},
publisher.congestion_control,
data_info.clone(),
None,
);
publisher.session.handle_data(
true,
&publisher.key_expr.to_wire(&publisher.session),
data_info,
value.payload,
);
if publisher.destination != Locality::SessionLocal {
primitives.send_data(
&publisher.key_expr.to_wire(&publisher.session),
value.payload.clone(),
Channel {
priority: publisher.priority.into(),
reliability: Reliability::Reliable, // @TODO: need to check subscriptions to determine the right reliability value
},
publisher.congestion_control,
data_info.clone(),
None,
);
}
if publisher.destination != Locality::Remote {
publisher.session.handle_data(
true,
&publisher.key_expr.to_wire(&publisher.session),
data_info,
value.payload,
);
}
Ok(())
}
}
Expand Down Expand Up @@ -480,6 +507,7 @@ pub struct PublisherBuilder<'a, 'b: 'a> {
pub(crate) key_expr: ZResult<KeyExpr<'b>>,
pub(crate) congestion_control: CongestionControl,
pub(crate) priority: Priority,
pub(crate) destination: Locality,
}

impl<'a, 'b> Clone for PublisherBuilder<'a, 'b> {
Expand All @@ -492,6 +520,7 @@ impl<'a, 'b> Clone for PublisherBuilder<'a, 'b> {
},
congestion_control: self.congestion_control,
priority: self.priority,
destination: self.destination,
}
}
}
Expand All @@ -510,6 +539,15 @@ impl<'a, 'b> PublisherBuilder<'a, 'b> {
self.priority = priority;
self
}

/// Restrict the matching subscribers that will receive the published data
/// to the ones that have the given [`Locality`](crate::prelude::Locality).
#[zenoh_core::unstable]
#[inline]
pub fn allowed_destination(mut self, destination: Locality) -> Self {
self.destination = destination;
self
}
}

impl<'a, 'b> Resolvable for PublisherBuilder<'a, 'b> {
Expand Down Expand Up @@ -555,6 +593,7 @@ impl<'a, 'b> SyncResolve for PublisherBuilder<'a, 'b> {
key_expr,
congestion_control: self.congestion_control,
priority: self.priority,
destination: self.destination,
};
log::trace!("publish({:?})", publisher.key_expr);
Ok(publisher)
Expand Down
17 changes: 17 additions & 0 deletions zenoh/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ pub struct GetBuilder<'a, 'b, Handler> {
pub(crate) selector: ZResult<Selector<'b>>,
pub(crate) target: QueryTarget,
pub(crate) consolidation: QueryConsolidation,
pub(crate) destination: Locality,
pub(crate) timeout: Duration,
pub(crate) handler: Handler,
pub(crate) value: Option<Value>,
Expand Down Expand Up @@ -183,6 +184,7 @@ impl<'a, 'b> GetBuilder<'a, 'b, DefaultHandler> {
selector,
target,
consolidation,
destination,
timeout,
value,
handler: _,
Expand All @@ -192,6 +194,7 @@ impl<'a, 'b> GetBuilder<'a, 'b, DefaultHandler> {
selector,
target,
consolidation,
destination,
timeout,
value,
handler: callback,
Expand Down Expand Up @@ -258,6 +261,7 @@ impl<'a, 'b> GetBuilder<'a, 'b, DefaultHandler> {
selector,
target,
consolidation,
destination,
timeout,
value,
handler: _,
Expand All @@ -267,6 +271,7 @@ impl<'a, 'b> GetBuilder<'a, 'b, DefaultHandler> {
selector,
target,
consolidation,
destination,
timeout,
value,
handler,
Expand All @@ -288,6 +293,15 @@ impl<'a, 'b, Handler> GetBuilder<'a, 'b, Handler> {
self
}

/// Restrict the matching queryables that will receive the query
/// to the ones that have the given [`Locality`](crate::prelude::Locality).
#[zenoh_core::unstable]
#[inline]
pub fn allowed_destination(mut self, destination: Locality) -> Self {
self.destination = destination;
self
}

/// Set query timeout.
#[inline]
pub fn timeout(mut self, timeout: Duration) -> Self {
Expand Down Expand Up @@ -318,6 +332,7 @@ impl<'a, 'b, Handler> GetBuilder<'a, 'b, Handler> {
selector,
target,
consolidation,
destination,
timeout,
value,
handler,
Expand All @@ -327,6 +342,7 @@ impl<'a, 'b, Handler> GetBuilder<'a, 'b, Handler> {
selector: selector.and_then(|s| s.accept_any_keyexpr(accept == ReplyKeyExpr::Any)),
target,
consolidation,
destination,
timeout,
value,
handler,
Expand Down Expand Up @@ -373,6 +389,7 @@ where
&self.selector?,
self.target,
self.consolidation,
self.destination,
self.timeout,
self.value,
callback,
Expand Down
Loading

0 comments on commit 6e20f88

Please sign in to comment.