Skip to content

Commit

Permalink
Merge pull request matrix-org#871 from matrix-org/erikj/linearize_sta…
Browse files Browse the repository at this point in the history
…te_fetch_on_pdu

Linearize fetching of gaps on incoming events
  • Loading branch information
erikjohnston authored Jun 15, 2016
2 parents 15bf3e3 + d41a1a9 commit a64dbae
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 37 deletions.
3 changes: 3 additions & 0 deletions synapse/federation/federation_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@


class FederationBase(object):
def __init__(self, hs):
pass

@defer.inlineCallbacks
def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False,
include_none=False):
Expand Down
2 changes: 2 additions & 0 deletions synapse/federation/federation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@


class FederationClient(FederationBase):
def __init__(self, hs):
super(FederationClient, self).__init__(hs)

def start_get_pdu_cache(self):
self._get_pdu_cache = ExpiringCache(
Expand Down
88 changes: 51 additions & 37 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from .federation_base import FederationBase
from .units import Transaction, Edu

from synapse.util.async import Linearizer
from synapse.util.logutils import log_function
from synapse.events import FrozenEvent
import synapse.metrics
Expand All @@ -44,6 +45,11 @@


class FederationServer(FederationBase):
def __init__(self, hs):
super(FederationServer, self).__init__(hs)

self._room_pdu_linearizer = Linearizer()

def set_handler(self, handler):
"""Sets the handler that the replication layer will use to communicate
receipt of new PDUs from other home servers. The required methods are
Expand Down Expand Up @@ -491,43 +497,51 @@ def _handle_new_pdu(self, origin, pdu, get_missing=True):
pdu.internal_metadata.outlier = True
elif min_depth and pdu.depth > min_depth:
if get_missing and prevs - seen:
latest = yield self.store.get_latest_event_ids_in_room(
pdu.room_id
)

# We add the prev events that we have seen to the latest
# list to ensure the remote server doesn't give them to us
latest = set(latest)
latest |= seen

logger.info(
"Missing %d events for room %r: %r...",
len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
)

missing_events = yield self.get_missing_events(
origin,
pdu.room_id,
earliest_events_ids=list(latest),
latest_events=[pdu],
limit=10,
min_depth=min_depth,
)

# We want to sort these by depth so we process them and
# tell clients about them in order.
missing_events.sort(key=lambda x: x.depth)

for e in missing_events:
yield self._handle_new_pdu(
origin,
e,
get_missing=False
)

have_seen = yield self.store.have_events(
[ev for ev, _ in pdu.prev_events]
)
# If we're missing stuff, ensure we only fetch stuff one
# at a time.
with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
# We recalculate seen, since it may have changed.
have_seen = yield self.store.have_events(prevs)
seen = set(have_seen.keys())

if prevs - seen:
latest = yield self.store.get_latest_event_ids_in_room(
pdu.room_id
)

# We add the prev events that we have seen to the latest
# list to ensure the remote server doesn't give them to us
latest = set(latest)
latest |= seen

logger.info(
"Missing %d events for room %r: %r...",
len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
)

missing_events = yield self.get_missing_events(
origin,
pdu.room_id,
earliest_events_ids=list(latest),
latest_events=[pdu],
limit=10,
min_depth=min_depth,
)

# We want to sort these by depth so we process them and
# tell clients about them in order.
missing_events.sort(key=lambda x: x.depth)

for e in missing_events:
yield self._handle_new_pdu(
origin,
e,
get_missing=False
)

have_seen = yield self.store.have_events(
[ev for ev, _ in pdu.prev_events]
)

prevs = {e_id for e_id, _ in pdu.prev_events}
seen = set(have_seen.keys())
Expand Down
2 changes: 2 additions & 0 deletions synapse/federation/replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,7 @@ def __init__(self, hs, transport_layer):

self.hs = hs

super(ReplicationLayer, self).__init__(hs)

def __str__(self):
return "<ReplicationLayer(%s)>" % self.server_name

0 comments on commit a64dbae

Please sign in to comment.