Skip to content

Commit

Permalink
QPID-6437 : ensure session/link flow notifies occur
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1664839 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
rgodfrey committed Mar 7, 2015
1 parent f6f0171 commit 5cdfd4c
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,14 @@ public void receiveFlow(final Flow flow)
setLinkCredit(limit.subtract(getDeliveryCount()));
}
}

getSession().getConnection().addPostLockAction(new Runnable()
{
@Override
public void run()
{
flowStateChanged();
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,49 +458,40 @@ public void receiveFlow(final Flow flow)
{
synchronized (getLock())
{
synchronized (getLock())
{
UnsignedInteger handle = flow.getHandle();
final LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle);
UnsignedInteger handle = flow.getHandle();
final LinkEndpoint endpoint = handle == null ? null : _remoteLinkEndpoints.get(handle);

final UnsignedInteger nextOutgoingId =
flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId();
int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue());
_outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue());
final UnsignedInteger nextOutgoingId =
flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId();
int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue());
_outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue());

if (endpoint != null)
{
getConnection().addPostLockAction(new Runnable()
{
@Override
public void run()
{
endpoint.receiveFlow(flow);
}
});
}
else
if (endpoint != null)
{
endpoint.receiveFlow(flow);
}
else
{
final Collection<LinkEndpoint> allLinkEndpoints = _remoteLinkEndpoints.values();
getConnection().addPostLockAction(new Runnable()
{
final Collection<LinkEndpoint> allLinkEndpoints = _remoteLinkEndpoints.values();
getConnection().addPostLockAction(new Runnable()
@Override
public void run()
{
@Override
public void run()
{

for(LinkEndpoint le : allLinkEndpoints)
{
le.flowStateChanged();
}
for(LinkEndpoint le : allLinkEndpoints)
{
le.flowStateChanged();
}
});
}

getLock().notifyAll();
}
});
}

getLock().notifyAll();
}



}

public void receiveDisposition(final Disposition disposition)
Expand Down

0 comments on commit 5cdfd4c

Please sign in to comment.