Skip to content

Commit

Permalink
Wire up open_session to return list of events to send
Browse files Browse the repository at this point in the history
  • Loading branch information
myagley committed Nov 4, 2019
1 parent 97d71da commit e8ae099
Showing 1 changed file with 10 additions and 10 deletions.
20 changes: 10 additions & 10 deletions mqtt-broker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,15 @@ impl Broker {
}

// Process the CONNECT packet after it has been validated

// TODO - fix ConnAck return_code != accepted to not add session to sessions map
match self.open_session(connreq) {
Ok(ack) => {
let should_drop = ack.return_code != proto::ConnectReturnCode::Accepted;

Ok((ack, events)) => {
// Send ConnAck on new session
let session = self.get_session_mut(&client_id)?;
session.send(Event::ConnAck(ack)).await?;

if should_drop {
session.send(Event::DropConnection).await?;
for event in events.into_iter() {
session.send(event).await?;
}
}
Err(SessionError::DuplicateSession(mut old_session, ack)) => {
Expand Down Expand Up @@ -408,7 +406,7 @@ impl Broker {
.ok_or(Error::new(ErrorKind::NoSession.into()))
}

fn open_session(&mut self, connreq: ConnReq) -> Result<proto::ConnAck, SessionError> {
fn open_session(&mut self, connreq: ConnReq) -> Result<(proto::ConnAck, Vec<Event>), SessionError> {
let client_id = connreq.client_id().clone();

match self.sessions.remove(&client_id) {
Expand Down Expand Up @@ -440,8 +438,9 @@ impl Broker {
session_present,
return_code: proto::ConnectReturnCode::Accepted,
};
let events = vec![];

Ok(ack)
Ok((ack, events))
}
Some(Session::Disconnecting(disconnecting)) => Err(SessionError::ProtocolViolation(
Session::Disconnecting(disconnecting),
Expand All @@ -465,8 +464,9 @@ impl Broker {
session_present: false,
return_code: proto::ConnectReturnCode::Accepted,
};
let events = vec![];

Ok(ack)
Ok((ack, events))
}
}
}
Expand All @@ -475,7 +475,7 @@ impl Broker {
&mut self,
connreq: ConnReq,
current_connected: ConnectedSession,
) -> Result<proto::ConnAck, SessionError> {
) -> Result<(proto::ConnAck, Vec<Event>), SessionError> {
if current_connected.handle() == connreq.handle() {
// [MQTT-3.1.0-2] - The Server MUST process a second CONNECT Packet
// sent from a Client as a protocol violation and disconnect the Client.
Expand Down

0 comments on commit e8ae099

Please sign in to comment.