Skip to content

Commit

Permalink
Use native UPSERTs where possible (matrix-org#4306)
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkowl authored Jan 24, 2019
1 parent 97fd29c commit 58f6c48
Show file tree
Hide file tree
Showing 14 changed files with 238 additions and 43 deletions.
6 changes: 1 addition & 5 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
[run]
branch = True
parallel = True
source = synapse

[paths]
source=
coverage
include = synapse/*

[report]
precision = 2
6 changes: 3 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ homeserver*.pid
*.tls.dh
*.tls.key

.coverage
.coverage.*
!.coverage.rc
.coverage*
coverage.*
!.coveragerc
htmlcov

demo/*/*.db
Expand Down
1 change: 1 addition & 0 deletions changelog.d/4306.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Synapse will now take advantage of native UPSERT functionality in PostgreSQL 9.5+ and SQLite 3.24+.
148 changes: 137 additions & 11 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,41 @@ def __init__(self, db_conn, hs):

self.database_engine = hs.database_engine

# A set of tables that are not safe to use native upserts in.
self._unsafe_to_upsert_tables = {"user_ips"}

if self.database_engine.can_native_upsert:
# Check ASAP (and then later, every 1s) to see if we have finished
# background updates of tables that aren't safe to update.
self._clock.call_later(0.0, self._check_safe_to_upsert)

@defer.inlineCallbacks
def _check_safe_to_upsert(self):
"""
Is it safe to use native UPSERT?
If there are background updates, we will need to wait, as they may be
the addition of indexes that set the UNIQUE constraint that we require.
If the background updates have not completed, wait a second and check again.
"""
updates = yield self._simple_select_list(
"background_updates",
keyvalues=None,
retcols=["update_name"],
desc="check_background_updates",
)
updates = [x["update_name"] for x in updates]

# The User IPs table in schema #53 was missing a unique index, which we
# run as a background update.
if "user_ips_device_unique_index" not in updates:
self._unsafe_to_upsert_tables.discard("user_id")

# If there's any tables left to check, reschedule to run.
if self._unsafe_to_upsert_tables:
self._clock.call_later(1.0, self._check_safe_to_upsert)

def start_profiling(self):
self._previous_loop_ts = self._clock.time_msec()

Expand Down Expand Up @@ -494,8 +529,15 @@ def _simple_insert_many_txn(txn, table, values):
txn.executemany(sql, vals)

@defer.inlineCallbacks
def _simple_upsert(self, table, keyvalues, values,
insertion_values={}, desc="_simple_upsert", lock=True):
def _simple_upsert(
self,
table,
keyvalues,
values,
insertion_values={},
desc="_simple_upsert",
lock=True
):
"""
`lock` should generally be set to True (the default), but can be set
Expand All @@ -516,16 +558,21 @@ def _simple_upsert(self, table, keyvalues, values,
inserting
lock (bool): True to lock the table when doing the upsert.
Returns:
Deferred(bool): True if a new entry was created, False if an
existing one was updated.
Deferred(None or bool): Native upserts always return None. Emulated
upserts return True if a new entry was created, False if an existing
one was updated.
"""
attempts = 0
while True:
try:
result = yield self.runInteraction(
desc,
self._simple_upsert_txn, table, keyvalues, values, insertion_values,
lock=lock
self._simple_upsert_txn,
table,
keyvalues,
values,
insertion_values,
lock=lock,
)
defer.returnValue(result)
except self.database_engine.module.IntegrityError as e:
Expand All @@ -537,12 +584,59 @@ def _simple_upsert(self, table, keyvalues, values,

# presumably we raced with another transaction: let's retry.
logger.warn(
"IntegrityError when upserting into %s; retrying: %s",
table, e
"%s when upserting into %s; retrying: %s", e.__name__, table, e
)

def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={},
lock=True):
def _simple_upsert_txn(
self,
txn,
table,
keyvalues,
values,
insertion_values={},
lock=True,
):
"""
Pick the UPSERT method which works best on the platform. Either the
native one (Pg9.5+, recent SQLites), or fall back to an emulated method.
Args:
txn: The transaction to use.
table (str): The table to upsert into
keyvalues (dict): The unique key tables and their new values
values (dict): The nonunique columns and their new values
insertion_values (dict): additional key/values to use only when
inserting
lock (bool): True to lock the table when doing the upsert.
Returns:
Deferred(None or bool): Native upserts always return None. Emulated
upserts return True if a new entry was created, False if an existing
one was updated.
"""
if (
self.database_engine.can_native_upsert
and table not in self._unsafe_to_upsert_tables
):
return self._simple_upsert_txn_native_upsert(
txn,
table,
keyvalues,
values,
insertion_values=insertion_values,
)
else:
return self._simple_upsert_txn_emulated(
txn,
table,
keyvalues,
values,
insertion_values=insertion_values,
lock=lock,
)

def _simple_upsert_txn_emulated(
self, txn, table, keyvalues, values, insertion_values={}, lock=True
):
# We need to lock the table :(, unless we're *really* careful
if lock:
self.database_engine.lock_table(txn, table)
Expand Down Expand Up @@ -577,12 +671,44 @@ def _getwhere(key):
sql = "INSERT INTO %s (%s) VALUES (%s)" % (
table,
", ".join(k for k in allvalues),
", ".join("?" for _ in allvalues)
", ".join("?" for _ in allvalues),
)
txn.execute(sql, list(allvalues.values()))
# successfully inserted
return True

def _simple_upsert_txn_native_upsert(
self, txn, table, keyvalues, values, insertion_values={}
):
"""
Use the native UPSERT functionality in recent PostgreSQL versions.
Args:
table (str): The table to upsert into
keyvalues (dict): The unique key tables and their new values
values (dict): The nonunique columns and their new values
insertion_values (dict): additional key/values to use only when
inserting
Returns:
None
"""
allvalues = {}
allvalues.update(keyvalues)
allvalues.update(values)
allvalues.update(insertion_values)

sql = (
"INSERT INTO %s (%s) VALUES (%s) "
"ON CONFLICT (%s) DO UPDATE SET %s"
) % (
table,
", ".join(k for k in allvalues),
", ".join("?" for _ in allvalues),
", ".join(k for k in keyvalues),
", ".join(k + "=EXCLUDED." + k for k in values),
)
txn.execute(sql, list(allvalues.values()))

def _simple_select_one(self, table, keyvalues, retcols,
allow_none=False, desc="_simple_select_one"):
"""Executes a SELECT query on the named table, which is expected to
Expand Down
5 changes: 4 additions & 1 deletion synapse/storage/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,10 @@ def update():
)

def _update_client_ips_batch_txn(self, txn, to_update):
self.database_engine.lock_table(txn, "user_ips")
if "user_ips" in self._unsafe_to_upsert_tables or (
not self.database_engine.can_native_upsert
):
self.database_engine.lock_table(txn, "user_ips")

for entry in iteritems(to_update):
(user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/engines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from ._base import IncorrectDatabaseSetup
from .postgres import PostgresEngine
from .sqlite3 import Sqlite3Engine
from .sqlite import Sqlite3Engine

SUPPORTED_MODULE = {
"sqlite3": Sqlite3Engine,
Expand Down
14 changes: 14 additions & 0 deletions synapse/storage/engines/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ def convert_param_style(self, sql):
return sql.replace("?", "%s")

def on_new_connection(self, db_conn):

# Get the version of PostgreSQL that we're using. As per the psycopg2
# docs: The number is formed by converting the major, minor, and
# revision numbers into two-decimal-digit numbers and appending them
# together. For example, version 8.1.5 will be returned as 80105
self._version = db_conn.server_version

db_conn.set_isolation_level(
self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
)
Expand All @@ -54,6 +61,13 @@ def on_new_connection(self, db_conn):

cursor.close()

@property
def can_native_upsert(self):
"""
Can we use native UPSERTs? This requires PostgreSQL 9.5+.
"""
return self._version >= 90500

def is_deadlock(self, error):
if isinstance(error, self.module.DatabaseError):
# https://www.postgresql.org/docs/current/static/errcodes-appendix.html
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import struct
import threading
from sqlite3 import sqlite_version_info

from synapse.storage.prepare_database import prepare_database

Expand All @@ -30,6 +31,14 @@ def __init__(self, database_module, database_config):
self._current_state_group_id = None
self._current_state_group_id_lock = threading.Lock()

@property
def can_native_upsert(self):
"""
Do we support native UPSERTs? This requires SQLite3 3.24+, plus some
more work we haven't done yet to tell what was inserted vs updated.
"""
return sqlite_version_info >= (3, 24, 0)

def check_database(self, txn):
pass

Expand Down
9 changes: 7 additions & 2 deletions synapse/storage/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ def add_pusher(self, user_id, access_token, kind, app_id,
with self._pushers_id_gen.get_next() as stream_id:
# no need to lock because `pushers` has a unique key on
# (app_id, pushkey, user_name) so _simple_upsert will retry
newly_inserted = yield self._simple_upsert(
yield self._simple_upsert(
table="pushers",
keyvalues={
"app_id": app_id,
Expand All @@ -238,7 +238,12 @@ def add_pusher(self, user_id, access_token, kind, app_id,
lock=False,
)

if newly_inserted:
user_has_pusher = self.get_if_user_has_pusher.cache.get(
(user_id,), None, update_metrics=False
)

if user_has_pusher is not True:
# invalidate, since we the user might not have had a pusher before
yield self.runInteraction(
"add_pusher",
self._invalidate_cache_and_stream,
Expand Down
55 changes: 40 additions & 15 deletions synapse/storage/user_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,14 +168,14 @@ def _update_profile_in_user_dir_txn(txn):
if isinstance(self.database_engine, PostgresEngine):
# We weight the localpart most highly, then display name and finally
# server name
if new_entry:
if self.database_engine.can_native_upsert:
sql = """
INSERT INTO user_directory_search(user_id, vector)
VALUES (?,
setweight(to_tsvector('english', ?), 'A')
|| setweight(to_tsvector('english', ?), 'D')
|| setweight(to_tsvector('english', COALESCE(?, '')), 'B')
)
) ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector
"""
txn.execute(
sql,
Expand All @@ -185,20 +185,45 @@ def _update_profile_in_user_dir_txn(txn):
)
)
else:
sql = """
UPDATE user_directory_search
SET vector = setweight(to_tsvector('english', ?), 'A')
|| setweight(to_tsvector('english', ?), 'D')
|| setweight(to_tsvector('english', COALESCE(?, '')), 'B')
WHERE user_id = ?
"""
txn.execute(
sql,
(
get_localpart_from_id(user_id), get_domain_from_id(user_id),
display_name, user_id,
# TODO: Remove this code after we've bumped the minimum version
# of postgres to always support upserts, so we can get rid of
# `new_entry` usage
if new_entry is True:
sql = """
INSERT INTO user_directory_search(user_id, vector)
VALUES (?,
setweight(to_tsvector('english', ?), 'A')
|| setweight(to_tsvector('english', ?), 'D')
|| setweight(to_tsvector('english', COALESCE(?, '')), 'B')
)
"""
txn.execute(
sql,
(
user_id, get_localpart_from_id(user_id),
get_domain_from_id(user_id), display_name,
)
)
elif new_entry is False:
sql = """
UPDATE user_directory_search
SET vector = setweight(to_tsvector('english', ?), 'A')
|| setweight(to_tsvector('english', ?), 'D')
|| setweight(to_tsvector('english', COALESCE(?, '')), 'B')
WHERE user_id = ?
"""
txn.execute(
sql,
(
get_localpart_from_id(user_id),
get_domain_from_id(user_id),
display_name, user_id,
)
)
else:
raise RuntimeError(
"upsert returned None when 'can_native_upsert' is False"
)
)
elif isinstance(self.database_engine, Sqlite3Engine):
value = "%s %s" % (user_id, display_name,) if display_name else user_id
self._simple_upsert_txn(
Expand Down
Loading

0 comments on commit 58f6c48

Please sign in to comment.