Skip to content

Commit

Permalink
add a cache to have_seen_event (matrix-org#9953)
Browse files Browse the repository at this point in the history
Empirically, this helped my server considerably when handling gaps in Matrix HQ. The problem was that we would repeatedly call have_seen_events for the same set of (50K or so) auth_events, each of which would take many minutes to complete, even though it's only an index scan.
  • Loading branch information
richvdh authored Jun 1, 2021
1 parent 10e6d2a commit b4b2fd2
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 20 deletions.
1 change: 1 addition & 0 deletions changelog.d/9953.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve performance of incoming federation transactions in large rooms.
1 change: 1 addition & 0 deletions changelog.d/9973.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve performance of incoming federation transactions in large rooms.
1 change: 0 additions & 1 deletion changelog.d/9973.misc

This file was deleted.

12 changes: 7 additions & 5 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,9 @@ async def _get_state_for_room(

# Fetch the state events from the DB, and check we have the auth events.
event_map = await self.store.get_events(state_event_ids, allow_rejected=True)
auth_events_in_store = await self.store.have_seen_events(auth_event_ids)
auth_events_in_store = await self.store.have_seen_events(
room_id, auth_event_ids
)

# Check for missing events. We handle state and auth event seperately,
# as we want to pull the state from the DB, but we don't for the auth
Expand Down Expand Up @@ -610,7 +612,7 @@ async def _get_state_for_room(

if missing_auth_events:
auth_events_in_store = await self.store.have_seen_events(
missing_auth_events
room_id, missing_auth_events
)
missing_auth_events.difference_update(auth_events_in_store)

Expand Down Expand Up @@ -710,7 +712,7 @@ async def _get_state_after_missing_prev_event(

missing_auth_events = set(auth_event_ids) - fetched_events.keys()
missing_auth_events.difference_update(
await self.store.have_seen_events(missing_auth_events)
await self.store.have_seen_events(room_id, missing_auth_events)
)
logger.debug("We are also missing %i auth events", len(missing_auth_events))

Expand Down Expand Up @@ -2475,7 +2477,7 @@ async def _update_auth_events_and_context_for_auth(
#
# we start by checking if they are in the store, and then try calling /event_auth/.
if missing_auth:
have_events = await self.store.have_seen_events(missing_auth)
have_events = await self.store.have_seen_events(event.room_id, missing_auth)
logger.debug("Events %s are in the store", have_events)
missing_auth.difference_update(have_events)

Expand All @@ -2494,7 +2496,7 @@ async def _update_auth_events_and_context_for_auth(
return context

seen_remotes = await self.store.have_seen_events(
[e.event_id for e in remote_auth_chain]
event.room_id, [e.event_id for e in remote_auth_chain]
)

for e in remote_auth_chain:
Expand Down
1 change: 1 addition & 0 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ def _invalidate_caches_for_event(
backfilled,
):
self._invalidate_get_event_cache(event_id)
self.have_seen_event.invalidate((room_id, event_id))

self.get_latest_event_ids_in_room.invalidate((room_id,))

Expand Down
61 changes: 52 additions & 9 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
Iterable,
List,
Optional,
Set,
Tuple,
overload,
)
Expand Down Expand Up @@ -55,7 +56,7 @@
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import JsonDict, get_domain_from_id
from synapse.util.caches.descriptors import cached
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.lrucache import LruCache
from synapse.util.iterutils import batch_iter
from synapse.util.metrics import Measure
Expand Down Expand Up @@ -1045,32 +1046,74 @@ async def have_events_in_timeline(self, event_ids):

return {r["event_id"] for r in rows}

async def have_seen_events(self, event_ids):
async def have_seen_events(
self, room_id: str, event_ids: Iterable[str]
) -> Set[str]:
"""Given a list of event ids, check if we have already processed them.
The room_id is only used to structure the cache (so that it can later be
invalidated by room_id) - there is no guarantee that the events are actually
in the room in question.
Args:
event_ids (iterable[str]):
room_id: Room we are polling
event_ids: events we are looking for
Returns:
set[str]: The events we have already seen.
"""
res = await self._have_seen_events_dict(
(room_id, event_id) for event_id in event_ids
)
return {eid for ((_rid, eid), have_event) in res.items() if have_event}

@cachedList("have_seen_event", "keys")
async def _have_seen_events_dict(
self, keys: Iterable[Tuple[str, str]]
) -> Dict[Tuple[str, str], bool]:
"""Helper for have_seen_events
Returns:
a dict {(room_id, event_id)-> bool}
"""
# if the event cache contains the event, obviously we've seen it.
results = {x for x in event_ids if self._get_event_cache.contains(x)}

def have_seen_events_txn(txn, chunk):
sql = "SELECT event_id FROM events as e WHERE "
cache_results = {
(rid, eid) for (rid, eid) in keys if self._get_event_cache.contains((eid,))
}
results = {x: True for x in cache_results}

def have_seen_events_txn(txn, chunk: Tuple[Tuple[str, str], ...]):
# we deliberately do *not* query the database for room_id, to make the
# query an index-only lookup on `events_event_id_key`.
#
# We therefore pull the events from the database into a set...

sql = "SELECT event_id FROM events AS e WHERE "
clause, args = make_in_list_sql_clause(
txn.database_engine, "e.event_id", chunk
txn.database_engine, "e.event_id", [eid for (_rid, eid) in chunk]
)
txn.execute(sql + clause, args)
results.update(row[0] for row in txn)
found_events = {eid for eid, in txn}

for chunk in batch_iter((x for x in event_ids if x not in results), 100):
# ... and then we can update the results for each row in the batch
results.update({(rid, eid): (eid in found_events) for (rid, eid) in chunk})

# each batch requires its own index scan, so we make the batches as big as
# possible.
for chunk in batch_iter((k for k in keys if k not in cache_results), 500):
await self.db_pool.runInteraction(
"have_seen_events", have_seen_events_txn, chunk
)

return results

@cached(max_entries=100000, tree=True)
async def have_seen_event(self, room_id: str, event_id: str):
# this only exists for the benefit of the @cachedList descriptor on
# _have_seen_events_dict
raise NotImplementedError()

def _get_current_state_event_counts_txn(self, txn, room_id):
"""
See get_current_state_event_counts.
Expand Down
26 changes: 21 additions & 5 deletions synapse/storage/databases/main/purge_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
from typing import Any, List, Set, Tuple

from synapse.api.errors import SynapseError
from synapse.storage._base import SQLBaseStore
from synapse.storage.databases.main import CacheInvalidationWorkerStore
from synapse.storage.databases.main.state import StateGroupWorkerStore
from synapse.types import RoomStreamToken

logger = logging.getLogger(__name__)


class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
async def purge_history(
self, room_id: str, token: str, delete_local_events: bool
) -> Set[int]:
Expand Down Expand Up @@ -203,8 +203,6 @@ def _purge_history_txn(
"DELETE FROM event_to_state_groups "
"WHERE event_id IN (SELECT event_id from events_to_purge)"
)
for event_id, _ in event_rows:
txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))

# Delete all remote non-state events
for table in (
Expand Down Expand Up @@ -283,6 +281,20 @@ def _purge_history_txn(
# so make sure to keep this actually last.
txn.execute("DROP TABLE events_to_purge")

for event_id, should_delete in event_rows:
self._invalidate_cache_and_stream(
txn, self._get_state_group_for_event, (event_id,)
)

# XXX: This is racy, since have_seen_events could be called between the
# transaction completing and the invalidation running. On the other hand,
# that's no different to calling `have_seen_events` just before the
# event is deleted from the database.
if should_delete:
self._invalidate_cache_and_stream(
txn, self.have_seen_event, (room_id, event_id)
)

logger.info("[purge] done")

return referenced_state_groups
Expand Down Expand Up @@ -422,7 +434,11 @@ def _purge_room_txn(self, txn, room_id: str) -> List[int]:
# index on them. In any case we should be clearing out 'stream' tables
# periodically anyway (#5888)

# TODO: we could probably usefully do a bunch of cache invalidation here
# TODO: we could probably usefully do a bunch more cache invalidation here

# XXX: as with purge_history, this is racy, but no worse than other races
# that already exist.
self._invalidate_cache_and_stream(txn, self.have_seen_event, (room_id,))

logger.info("[purge] done")

Expand Down
13 changes: 13 additions & 0 deletions tests/storage/databases/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
13 changes: 13 additions & 0 deletions tests/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
96 changes: 96 additions & 0 deletions tests/storage/databases/main/test_events_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json

from synapse.logging.context import LoggingContext
from synapse.storage.databases.main.events_worker import EventsWorkerStore

from tests import unittest


class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
def prepare(self, reactor, clock, hs):
self.store: EventsWorkerStore = hs.get_datastore()

# insert some test data
for rid in ("room1", "room2"):
self.get_success(
self.store.db_pool.simple_insert(
"rooms",
{"room_id": rid, "room_version": 4},
)
)

for idx, (rid, eid) in enumerate(
(
("room1", "event10"),
("room1", "event11"),
("room1", "event12"),
("room2", "event20"),
)
):
self.get_success(
self.store.db_pool.simple_insert(
"events",
{
"event_id": eid,
"room_id": rid,
"topological_ordering": idx,
"stream_ordering": idx,
"type": "test",
"processed": True,
"outlier": False,
},
)
)
self.get_success(
self.store.db_pool.simple_insert(
"event_json",
{
"event_id": eid,
"room_id": rid,
"json": json.dumps({"type": "test", "room_id": rid}),
"internal_metadata": "{}",
"format_version": 3,
},
)
)

def test_simple(self):
with LoggingContext(name="test") as ctx:
res = self.get_success(
self.store.have_seen_events("room1", ["event10", "event19"])
)
self.assertEquals(res, {"event10"})

# that should result in a single db query
self.assertEquals(ctx.get_resource_usage().db_txn_count, 1)

# a second lookup of the same events should cause no queries
with LoggingContext(name="test") as ctx:
res = self.get_success(
self.store.have_seen_events("room1", ["event10", "event19"])
)
self.assertEquals(res, {"event10"})
self.assertEquals(ctx.get_resource_usage().db_txn_count, 0)

def test_query_via_event_cache(self):
# fetch an event into the event cache
self.get_success(self.store.get_event("event10"))

# looking it up should now cause no db hits
with LoggingContext(name="test") as ctx:
res = self.get_success(self.store.have_seen_events("room1", ["event10"]))
self.assertEquals(res, {"event10"})
self.assertEquals(ctx.get_resource_usage().db_txn_count, 0)

0 comments on commit b4b2fd2

Please sign in to comment.