Skip to content

Commit

Permalink
Move background update handling out of store
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston committed Dec 5, 2019
1 parent 8863624 commit 4a33a6d
Show file tree
Hide file tree
Showing 27 changed files with 281 additions and 200 deletions.
2 changes: 1 addition & 1 deletion synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ def start():
_base.start(hs, config.listeners)

hs.get_pusherpool().start()
hs.get_datastore().start_doing_background_updates()
hs.get_datastore().db.updates.start_doing_background_updates()
except Exception:
# Print the exception and bail out.
print("Error during startup:", file=sys.stderr)
Expand Down
2 changes: 1 addition & 1 deletion synapse/rest/media/v1/preview_url_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ def _expire_url_cache_data(self):

logger.info("Running url preview cache expiry")

if not (yield self.store.has_completed_background_updates()):
if not (yield self.store.db.updates.has_completed_background_updates()):
logger.info("Still running DB updates; skipping expiry")
return

Expand Down
15 changes: 7 additions & 8 deletions synapse/storage/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from synapse.metrics.background_process_metrics import run_as_background_process

from . import engines
from ._base import SQLBaseStore

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -74,7 +73,7 @@ def total_items_per_ms(self):
return float(self.total_item_count) / float(self.total_duration_ms)


class BackgroundUpdateStore(SQLBaseStore):
class BackgroundUpdater(object):
""" Background updates are updates to the database that run in the
background. Each update processes a batch of data at once. We attempt to
limit the impact of each update by monitoring how long each batch takes to
Expand All @@ -86,8 +85,10 @@ class BackgroundUpdateStore(SQLBaseStore):
BACKGROUND_UPDATE_INTERVAL_MS = 1000
BACKGROUND_UPDATE_DURATION_MS = 100

def __init__(self, db_conn, hs):
super(BackgroundUpdateStore, self).__init__(db_conn, hs)
def __init__(self, hs, database):
self._clock = hs.get_clock()
self.db = database

self._background_update_performance = {}
self._background_update_queue = []
self._background_update_handlers = {}
Expand All @@ -101,9 +102,7 @@ def run_background_updates(self, sleep=True):
logger.info("Starting background schema updates")
while True:
if sleep:
yield self.hs.get_clock().sleep(
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0
)
yield self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0)

try:
result = yield self.do_next_background_update(
Expand Down Expand Up @@ -380,7 +379,7 @@ def create_index_sqlite(conn):
logger.debug("[SQL] %s", sql)
c.execute(sql)

if isinstance(self.database_engine, engines.PostgresEngine):
if isinstance(self.db.database_engine, engines.PostgresEngine):
runner = create_index_psql
elif psql_only:
runner = None
Expand Down
36 changes: 19 additions & 17 deletions synapse/storage/data_stores/main/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from twisted.internet import defer

from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage import background_updates
from synapse.storage._base import SQLBaseStore
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.caches.descriptors import Cache

Expand All @@ -32,41 +32,41 @@
LAST_SEEN_GRANULARITY = 120 * 1000


class ClientIpBackgroundUpdateStore(background_updates.BackgroundUpdateStore):
class ClientIpBackgroundUpdateStore(SQLBaseStore):
def __init__(self, db_conn, hs):
super(ClientIpBackgroundUpdateStore, self).__init__(db_conn, hs)

self.register_background_index_update(
self.db.updates.register_background_index_update(
"user_ips_device_index",
index_name="user_ips_device_id",
table="user_ips",
columns=["user_id", "device_id", "last_seen"],
)

self.register_background_index_update(
self.db.updates.register_background_index_update(
"user_ips_last_seen_index",
index_name="user_ips_last_seen",
table="user_ips",
columns=["user_id", "last_seen"],
)

self.register_background_index_update(
self.db.updates.register_background_index_update(
"user_ips_last_seen_only_index",
index_name="user_ips_last_seen_only",
table="user_ips",
columns=["last_seen"],
)

self.register_background_update_handler(
self.db.updates.register_background_update_handler(
"user_ips_analyze", self._analyze_user_ip
)

self.register_background_update_handler(
self.db.updates.register_background_update_handler(
"user_ips_remove_dupes", self._remove_user_ip_dupes
)

# Register a unique index
self.register_background_index_update(
self.db.updates.register_background_index_update(
"user_ips_device_unique_index",
index_name="user_ips_user_token_ip_unique_index",
table="user_ips",
Expand All @@ -75,12 +75,12 @@ def __init__(self, db_conn, hs):
)

# Drop the old non-unique index
self.register_background_update_handler(
self.db.updates.register_background_update_handler(
"user_ips_drop_nonunique_index", self._remove_user_ip_nonunique
)

# Update the last seen info in devices.
self.register_background_update_handler(
self.db.updates.register_background_update_handler(
"devices_last_seen", self._devices_last_seen_update
)

Expand All @@ -92,7 +92,7 @@ def f(conn):
txn.close()

yield self.db.runWithConnection(f)
yield self._end_background_update("user_ips_drop_nonunique_index")
yield self.db.updates._end_background_update("user_ips_drop_nonunique_index")
return 1

@defer.inlineCallbacks
Expand All @@ -108,7 +108,7 @@ def user_ips_analyze(txn):

yield self.db.runInteraction("user_ips_analyze", user_ips_analyze)

yield self._end_background_update("user_ips_analyze")
yield self.db.updates._end_background_update("user_ips_analyze")

return 1

Expand Down Expand Up @@ -271,14 +271,14 @@ def remove(txn):
(user_id, access_token, ip, device_id, user_agent, last_seen),
)

self._background_update_progress_txn(
self.db.updates._background_update_progress_txn(
txn, "user_ips_remove_dupes", {"last_seen": end_last_seen}
)

yield self.db.runInteraction("user_ips_dups_remove", remove)

if last:
yield self._end_background_update("user_ips_remove_dupes")
yield self.db.updates._end_background_update("user_ips_remove_dupes")

return batch_size

Expand Down Expand Up @@ -344,7 +344,7 @@ def _devices_last_seen_update_txn(txn):
txn.execute_batch(sql, rows)

_, _, _, user_id, device_id = rows[-1]
self._background_update_progress_txn(
self.db.updates._background_update_progress_txn(
txn,
"devices_last_seen",
{"last_user_id": user_id, "last_device_id": device_id},
Expand All @@ -357,7 +357,7 @@ def _devices_last_seen_update_txn(txn):
)

if not updated:
yield self._end_background_update("devices_last_seen")
yield self.db.updates._end_background_update("devices_last_seen")

return updated

Expand Down Expand Up @@ -546,7 +546,9 @@ async def _prune_old_user_ips(self):
# Nothing to do
return

if not await self.has_completed_background_update("devices_last_seen"):
if not await self.db.updates.has_completed_background_update(
"devices_last_seen"
):
# Only start pruning if we have finished populating the devices
# last seen info.
return
Expand Down
9 changes: 4 additions & 5 deletions synapse/storage/data_stores/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.util.caches.expiringcache import ExpiringCache

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -208,20 +207,20 @@ def delete_messages_for_remote_destination_txn(txn):
)


class DeviceInboxBackgroundUpdateStore(BackgroundUpdateStore):
class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"

def __init__(self, db_conn, hs):
super(DeviceInboxBackgroundUpdateStore, self).__init__(db_conn, hs)

self.register_background_index_update(
self.db.updates.register_background_index_update(
"device_inbox_stream_index",
index_name="device_inbox_stream_id_user_id",
table="device_inbox",
columns=["stream_id", "user_id"],
)

self.register_background_update_handler(
self.db.updates.register_background_update_handler(
self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox
)

Expand All @@ -234,7 +233,7 @@ def reindex_txn(conn):

yield self.db.runWithConnection(reindex_txn)

yield self._end_background_update(self.DEVICE_INBOX_STREAM_ID)
yield self.db.updates._end_background_update(self.DEVICE_INBOX_STREAM_ID)

return 1

Expand Down
15 changes: 8 additions & 7 deletions synapse/storage/data_stores/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.types import get_verify_key_from_cross_signing_key
from synapse.util import batch_iter
from synapse.util.caches.descriptors import (
Expand Down Expand Up @@ -642,19 +641,19 @@ def get_device_list_last_stream_id_for_remotes(self, user_ids):
return results


class DeviceBackgroundUpdateStore(BackgroundUpdateStore):
class DeviceBackgroundUpdateStore(SQLBaseStore):
def __init__(self, db_conn, hs):
super(DeviceBackgroundUpdateStore, self).__init__(db_conn, hs)

self.register_background_index_update(
self.db.updates.register_background_index_update(
"device_lists_stream_idx",
index_name="device_lists_stream_user_id",
table="device_lists_stream",
columns=["user_id", "device_id"],
)

# create a unique index on device_lists_remote_cache
self.register_background_index_update(
self.db.updates.register_background_index_update(
"device_lists_remote_cache_unique_idx",
index_name="device_lists_remote_cache_unique_id",
table="device_lists_remote_cache",
Expand All @@ -663,7 +662,7 @@ def __init__(self, db_conn, hs):
)

# And one on device_lists_remote_extremeties
self.register_background_index_update(
self.db.updates.register_background_index_update(
"device_lists_remote_extremeties_unique_idx",
index_name="device_lists_remote_extremeties_unique_idx",
table="device_lists_remote_extremeties",
Expand All @@ -672,7 +671,7 @@ def __init__(self, db_conn, hs):
)

# once they complete, we can remove the old non-unique indexes.
self.register_background_update_handler(
self.db.updates.register_background_update_handler(
DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES,
self._drop_device_list_streams_non_unique_indexes,
)
Expand All @@ -686,7 +685,9 @@ def f(conn):
txn.close()

yield self.db.runWithConnection(f)
yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES)
yield self.db.updates._end_background_update(
DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES
)
return 1


Expand Down
6 changes: 3 additions & 3 deletions synapse/storage/data_stores/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ class EventFederationStore(EventFederationWorkerStore):
def __init__(self, db_conn, hs):
super(EventFederationStore, self).__init__(db_conn, hs)

self.register_background_update_handler(
self.db.updates.register_background_update_handler(
self.EVENT_AUTH_STATE_ONLY, self._background_delete_non_state_event_auth
)

Expand Down Expand Up @@ -654,7 +654,7 @@ def delete_event_auth(txn):
"max_stream_id_exclusive": min_stream_id,
}

self._background_update_progress_txn(
self.db.updates._background_update_progress_txn(
txn, self.EVENT_AUTH_STATE_ONLY, new_progress
)

Expand All @@ -665,6 +665,6 @@ def delete_event_auth(txn):
)

if not result:
yield self._end_background_update(self.EVENT_AUTH_STATE_ONLY)
yield self.db.updates._end_background_update(self.EVENT_AUTH_STATE_ONLY)

return batch_size
4 changes: 2 additions & 2 deletions synapse/storage/data_stores/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,14 +614,14 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
def __init__(self, db_conn, hs):
super(EventPushActionsStore, self).__init__(db_conn, hs)

self.register_background_index_update(
self.db.updates.register_background_index_update(
self.EPA_HIGHLIGHT_INDEX,
index_name="event_push_actions_u_highlight",
table="event_push_actions",
columns=["user_id", "stream_ordering"],
)

self.register_background_index_update(
self.db.updates.register_background_index_update(
"event_push_actions_highlights_index",
index_name="event_push_actions_highlights_index",
table="event_push_actions",
Expand Down
6 changes: 1 addition & 5 deletions synapse/storage/data_stores/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
from synapse.metrics import BucketCollector
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import make_in_list_sql_clause
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.data_stores.main.event_federation import EventFederationStore
from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
from synapse.storage.data_stores.main.state import StateGroupWorkerStore
Expand Down Expand Up @@ -94,10 +93,7 @@ def f(self, *args, **kwargs):
# inherits from EventFederationStore so that we can call _update_backward_extremities
# and _handle_mult_prev_events (though arguably those could both be moved in here)
class EventsStore(
StateGroupWorkerStore,
EventFederationStore,
EventsWorkerStore,
BackgroundUpdateStore,
StateGroupWorkerStore, EventFederationStore, EventsWorkerStore,
):
def __init__(self, db_conn, hs):
super(EventsStore, self).__init__(db_conn, hs)
Expand Down
Loading

0 comments on commit 4a33a6d

Please sign in to comment.