Skip to content

Commit

Permalink
Merge branch 'release-v1.37' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
richvdh committed Jun 29, 2021
2 parents f558369 + ba9b744 commit 785bcee
Show file tree
Hide file tree
Showing 12 changed files with 718 additions and 6 deletions.
1 change: 1 addition & 0 deletions changelog.d/10269.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Handle inbound events from federation asynchronously.
1 change: 1 addition & 0 deletions changelog.d/10272.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Handle inbound events from federation asynchronously.
2 changes: 1 addition & 1 deletion synapse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
except ImportError:
pass

__version__ = "1.37.0"
__version__ = "1.37.1a1"

if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when
Expand Down
2 changes: 2 additions & 0 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
from synapse.storage.databases.main.censor_events import CensorEventsStore
from synapse.storage.databases.main.client_ips import ClientIpWorkerStore
from synapse.storage.databases.main.e2e_room_keys import EndToEndRoomKeyStore
from synapse.storage.databases.main.lock import LockStore
from synapse.storage.databases.main.media_repository import MediaRepositoryStore
from synapse.storage.databases.main.metrics import ServerMetricsStore
from synapse.storage.databases.main.monthly_active_users import (
Expand Down Expand Up @@ -249,6 +250,7 @@ class GenericWorkerSlavedStore(
ServerMetricsStore,
SearchStore,
TransactionWorkerStore,
LockStore,
BaseSlavedStore,
):
pass
Expand Down
98 changes: 96 additions & 2 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
SynapseError,
UnsupportedRoomVersionError,
)
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
Expand All @@ -58,10 +58,12 @@
)
from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace
from synapse.logging.utils import log_function
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
ReplicationGetQueryRestServlet,
)
from synapse.storage.databases.main.lock import Lock
from synapse.types import JsonDict
from synapse.util import glob_to_regex, json_decoder, unwrapFirstError
from synapse.util.async_helpers import Linearizer, concurrently_execute
Expand Down Expand Up @@ -97,6 +99,11 @@
)


# The name of the lock to use when process events in a room received over
# federation.
_INBOUND_EVENT_HANDLING_LOCK_NAME = "federation_inbound_pdu"


class FederationServer(FederationBase):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
Expand Down Expand Up @@ -857,7 +864,94 @@ async def _handle_received_pdu(self, origin: str, pdu: EventBase) -> None:
except SynapseError as e:
raise FederationError("ERROR", e.code, e.msg, affected=pdu.event_id)

await self.handler.on_receive_pdu(origin, pdu, sent_to_us_directly=True)
# Add the event to our staging area
await self.store.insert_received_event_to_staging(origin, pdu)

# Try and acquire the processing lock for the room, if we get it start a
# background process for handling the events in the room.
lock = await self.store.try_acquire_lock(
_INBOUND_EVENT_HANDLING_LOCK_NAME, pdu.room_id
)
if lock:
self._process_incoming_pdus_in_room_inner(
pdu.room_id, room_version, lock, origin, pdu
)

@wrap_as_background_process("_process_incoming_pdus_in_room_inner")
async def _process_incoming_pdus_in_room_inner(
self,
room_id: str,
room_version: RoomVersion,
lock: Lock,
latest_origin: str,
latest_event: EventBase,
) -> None:
"""Process events in the staging area for the given room.
The latest_origin and latest_event args are the latest origin and event
received.
"""

# The common path is for the event we just received be the only event in
# the room, so instead of pulling the event out of the DB and parsing
# the event we just pull out the next event ID and check if that matches.
next_origin, next_event_id = await self.store.get_next_staged_event_id_for_room(
room_id
)
if next_origin == latest_origin and next_event_id == latest_event.event_id:
origin = latest_origin
event = latest_event
else:
next = await self.store.get_next_staged_event_for_room(
room_id, room_version
)
if not next:
return

origin, event = next

# We loop round until there are no more events in the room in the
# staging area, or we fail to get the lock (which means another process
# has started processing).
while True:
async with lock:
try:
await self.handler.on_receive_pdu(
origin, event, sent_to_us_directly=True
)
except FederationError as e:
# XXX: Ideally we'd inform the remote we failed to process
# the event, but we can't return an error in the transaction
# response (as we've already responded).
logger.warning("Error handling PDU %s: %s", event.event_id, e)
except Exception:
f = failure.Failure()
logger.error(
"Failed to handle PDU %s",
event.event_id,
exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore
)

await self.store.remove_received_event_from_staging(
origin, event.event_id
)

# We need to do this check outside the lock to avoid a race between
# a new event being inserted by another instance and it attempting
# to acquire the lock.
next = await self.store.get_next_staged_event_for_room(
room_id, room_version
)
if not next:
break

origin, event = next

lock = await self.store.try_acquire_lock(
_INBOUND_EVENT_HANDLING_LOCK_NAME, room_id
)
if not lock:
return

def __str__(self) -> str:
return "<ReplicationLayer(%s)>" % self.server_name
Expand Down
2 changes: 2 additions & 0 deletions synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from .filtering import FilteringStore
from .group_server import GroupServerStore
from .keys import KeyStore
from .lock import LockStore
from .media_repository import MediaRepositoryStore
from .metrics import ServerMetricsStore
from .monthly_active_users import MonthlyActiveUsersStore
Expand Down Expand Up @@ -119,6 +120,7 @@ class DataStore(
CacheInvalidationWorkerStore,
ServerMetricsStore,
EventForwardExtremitiesStore,
LockStore,
):
def __init__(self, database: DatabasePool, db_conn, hs):
self.hs = hs
Expand Down
109 changes: 106 additions & 3 deletions synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,20 @@
import itertools
import logging
from queue import Empty, PriorityQueue
from typing import Collection, Dict, Iterable, List, Set, Tuple
from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple

from synapse.api.constants import MAX_DEPTH
from synapse.api.errors import StoreError
from synapse.events import EventBase
from synapse.api.room_versions import RoomVersion
from synapse.events import EventBase, make_event_from_dict
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.signatures import SignatureWorkerStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Cursor
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
from synapse.util.caches.lrucache import LruCache
from synapse.util.iterutils import batch_iter
Expand Down Expand Up @@ -1044,6 +1046,107 @@ def _delete_old_forward_extrem_cache_txn(txn):
_delete_old_forward_extrem_cache_txn,
)

async def insert_received_event_to_staging(
self, origin: str, event: EventBase
) -> None:
"""Insert a newly received event from federation into the staging area."""

# We use an upsert here to handle the case where we see the same event
# from the same server multiple times.
await self.db_pool.simple_upsert(
table="federation_inbound_events_staging",
keyvalues={
"origin": origin,
"event_id": event.event_id,
},
values={},
insertion_values={
"room_id": event.room_id,
"received_ts": self._clock.time_msec(),
"event_json": json_encoder.encode(event.get_dict()),
"internal_metadata": json_encoder.encode(
event.internal_metadata.get_dict()
),
},
desc="insert_received_event_to_staging",
)

async def remove_received_event_from_staging(
self,
origin: str,
event_id: str,
) -> None:
"""Remove the given event from the staging area"""
await self.db_pool.simple_delete(
table="federation_inbound_events_staging",
keyvalues={
"origin": origin,
"event_id": event_id,
},
desc="remove_received_event_from_staging",
)

async def get_next_staged_event_id_for_room(
self,
room_id: str,
) -> Optional[Tuple[str, str]]:
"""Get the next event ID in the staging area for the given room."""

def _get_next_staged_event_id_for_room_txn(txn):
sql = """
SELECT origin, event_id
FROM federation_inbound_events_staging
WHERE room_id = ?
ORDER BY received_ts ASC
LIMIT 1
"""

txn.execute(sql, (room_id,))

return txn.fetchone()

return await self.db_pool.runInteraction(
"get_next_staged_event_id_for_room", _get_next_staged_event_id_for_room_txn
)

async def get_next_staged_event_for_room(
self,
room_id: str,
room_version: RoomVersion,
) -> Optional[Tuple[str, EventBase]]:
"""Get the next event in the staging area for the given room."""

def _get_next_staged_event_for_room_txn(txn):
sql = """
SELECT event_json, internal_metadata, origin
FROM federation_inbound_events_staging
WHERE room_id = ?
ORDER BY received_ts ASC
LIMIT 1
"""
txn.execute(sql, (room_id,))

return txn.fetchone()

row = await self.db_pool.runInteraction(
"get_next_staged_event_for_room", _get_next_staged_event_for_room_txn
)

if not row:
return None

event_d = db_to_json(row[0])
internal_metadata_d = db_to_json(row[1])
origin = row[2]

event = make_event_from_dict(
event_dict=event_d,
room_version=room_version,
internal_metadata_dict=internal_metadata_d,
)

return origin, event


class EventFederationStore(EventFederationWorkerStore):
"""Responsible for storing and serving up the various graphs associated
Expand Down
Loading

0 comments on commit 785bcee

Please sign in to comment.