Skip to content

Commit

Permalink
Reduce DB hits for replication
Browse files Browse the repository at this point in the history
Some streams will occaisonally advance their positions without actually
having any new rows to send over federation. Currently this means that
the token will not advance on the workers, leading to them repeatedly
sending a slightly out of date token. This in turns requires the master
to hit the DB to check if there are any new rows, rather than hitting
the no op logic where we check if the given token matches the current
token.

This commit changes the API to always return an entry if the position
for a stream has changed, allowing workers to advance their tokens
correctly.
  • Loading branch information
erikjohnston committed Sep 23, 2016
1 parent 667fcd5 commit 748d8fd
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 45 deletions.
8 changes: 4 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ homeserver*.yaml
.coverage
htmlcov

demo/*.db
demo/*.log
demo/*.log.*
demo/*.pid
demo/*/*.db
demo/*/*.log
demo/*/*.log.*
demo/*/*.pid
demo/media_store.*
demo/etc

Expand Down
4 changes: 2 additions & 2 deletions synapse/app/pusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,15 +197,15 @@ def poke_pushers(results):
yield start_pusher(user_id, app_id, pushkey)

stream = results.get("events")
if stream:
if stream and stream["rows"]:
min_stream_id = stream["rows"][0][0]
max_stream_id = stream["position"]
preserve_fn(pusher_pool.on_new_notifications)(
min_stream_id, max_stream_id
)

stream = results.get("receipts")
if stream:
if stream and stream["rows"]:
rows = stream["rows"]
affected_room_ids = set(row[1] for row in rows)
min_stream_id = rows[0][0]
Expand Down
139 changes: 103 additions & 36 deletions synapse/replication/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from synapse.http.server import request_handler, finish_request
from synapse.replication.pusher_resource import PusherResource
from synapse.replication.presence_resource import PresenceResource
from synapse.api.errors import SynapseError

from twisted.web.resource import Resource
from twisted.web.server import NOT_DONE_YET
Expand Down Expand Up @@ -166,7 +167,8 @@ def _async_render_GET(self, request):
def replicate():
return self.replicate(request_streams, limit)

result = yield self.notifier.wait_for_replication(replicate, timeout)
writer = yield self.notifier.wait_for_replication(replicate, timeout)
result = writer.finish()

for stream_name, stream_content in result.items():
logger.info(
Expand All @@ -186,6 +188,9 @@ def replicate(self, request_streams, limit):
current_token = yield self.current_replication_token()
logger.debug("Replicating up to %r", current_token)

if limit == 0:
raise SynapseError(400, "Limit cannot be 0")

yield self.account_data(writer, current_token, limit, request_streams)
yield self.events(writer, current_token, limit, request_streams)
# TODO: implement limit
Expand All @@ -200,7 +205,7 @@ def replicate(self, request_streams, limit):
self.streams(writer, current_token, request_streams)

logger.debug("Replicated %d rows", writer.total)
defer.returnValue(writer.finish())
defer.returnValue(writer)

def streams(self, writer, current_token, request_streams):
request_token = request_streams.get("streams")
Expand Down Expand Up @@ -233,31 +238,52 @@ def events(self, writer, current_token, limit, request_streams):
request_backfill = request_streams.get("backfill")

if request_events is not None or request_backfill is not None:
if request_events is None:
if request_backfill is None:
request_events = current_token.events
if request_backfill is None:
request_backfill = current_token.backfill

no_new_tokens = (
request_events == current_token.events
and request_backfill == current_token.backfill
)
if no_new_tokens:
return

res = yield self.store.get_all_new_events(
request_backfill, request_events,
current_token.backfill, current_token.events,
limit
)
writer.write_header_and_rows("events", res.new_forward_events, (
"position", "internal", "json", "state_group"
))
writer.write_header_and_rows("backfill", res.new_backfill_events, (
"position", "internal", "json", "state_group"
))

upto_events_token = _position_from_rows(
res.new_forward_events, current_token.events
)

upto_backfill_token = _position_from_rows(
res.new_backfill_events, current_token.backfill
)

if request_events != upto_events_token:
writer.write_header_and_rows("events", res.new_forward_events, (
"position", "internal", "json", "state_group"
), position=upto_events_token)

if request_backfill != upto_backfill_token:
writer.write_header_and_rows("backfill", res.new_backfill_events, (
"position", "internal", "json", "state_group",
), position=upto_backfill_token)

writer.write_header_and_rows(
"forward_ex_outliers", res.forward_ex_outliers,
("position", "event_id", "state_group")
("position", "event_id", "state_group"),
)
writer.write_header_and_rows(
"backward_ex_outliers", res.backward_ex_outliers,
("position", "event_id", "state_group")
("position", "event_id", "state_group"),
)
writer.write_header_and_rows(
"state_resets", res.state_resets, ("position",)
"state_resets", res.state_resets, ("position",),
)

@defer.inlineCallbacks
Expand All @@ -266,23 +292,24 @@ def presence(self, writer, current_token, request_streams):

request_presence = request_streams.get("presence")

if request_presence is not None:
if request_presence is not None and request_presence != current_position:
presence_rows = yield self.presence_handler.get_all_presence_updates(
request_presence, current_position
)
upto_token = _position_from_rows(presence_rows, current_position)
writer.write_header_and_rows("presence", presence_rows, (
"position", "user_id", "state", "last_active_ts",
"last_federation_update_ts", "last_user_sync_ts",
"status_msg", "currently_active",
))
), position=upto_token)

@defer.inlineCallbacks
def typing(self, writer, current_token, request_streams):
current_position = current_token.typing

request_typing = request_streams.get("typing")

if request_typing is not None:
if request_typing is not None and request_typing != current_position:
# If they have a higher token than current max, we can assume that
# they had been talking to a previous instance of the master. Since
# we reset the token on restart, the best (but hacky) thing we can
Expand All @@ -293,23 +320,25 @@ def typing(self, writer, current_token, request_streams):
typing_rows = yield self.typing_handler.get_all_typing_updates(
request_typing, current_position
)
upto_token = _position_from_rows(typing_rows, current_position)
writer.write_header_and_rows("typing", typing_rows, (
"position", "room_id", "typing"
))
), position=upto_token)

@defer.inlineCallbacks
def receipts(self, writer, current_token, limit, request_streams):
current_position = current_token.receipts

request_receipts = request_streams.get("receipts")

if request_receipts is not None:
if request_receipts is not None and request_receipts != current_position:
receipts_rows = yield self.store.get_all_updated_receipts(
request_receipts, current_position, limit
)
upto_token = _position_from_rows(receipts_rows, current_position)
writer.write_header_and_rows("receipts", receipts_rows, (
"position", "room_id", "receipt_type", "user_id", "event_id", "data"
))
), position=upto_token)

@defer.inlineCallbacks
def account_data(self, writer, current_token, limit, request_streams):
Expand All @@ -324,99 +353,117 @@ def account_data(self, writer, current_token, limit, request_streams):
user_account_data = current_position
if room_account_data is None:
room_account_data = current_position

no_new_tokens = (
user_account_data == current_position
and room_account_data == current_position
)
if no_new_tokens:
return

user_rows, room_rows = yield self.store.get_all_updated_account_data(
user_account_data, room_account_data, current_position, limit
)

upto_users_token = _position_from_rows(user_rows, current_position)
upto_rooms_token = _position_from_rows(room_rows, current_position)

writer.write_header_and_rows("user_account_data", user_rows, (
"position", "user_id", "type", "content"
))
), position=upto_users_token)
writer.write_header_and_rows("room_account_data", room_rows, (
"position", "user_id", "room_id", "type", "content"
))
), position=upto_rooms_token)

if tag_account_data is not None:
tag_rows = yield self.store.get_all_updated_tags(
tag_account_data, current_position, limit
)
upto_tag_token = _position_from_rows(tag_rows, current_position)
writer.write_header_and_rows("tag_account_data", tag_rows, (
"position", "user_id", "room_id", "tags"
))
), position=upto_tag_token)

@defer.inlineCallbacks
def push_rules(self, writer, current_token, limit, request_streams):
current_position = current_token.push_rules

push_rules = request_streams.get("push_rules")

if push_rules is not None:
if push_rules is not None and push_rules != current_position:
rows = yield self.store.get_all_push_rule_updates(
push_rules, current_position, limit
)
upto_token = _position_from_rows(rows, current_position)
writer.write_header_and_rows("push_rules", rows, (
"position", "event_stream_ordering", "user_id", "rule_id", "op",
"priority_class", "priority", "conditions", "actions"
))
), position=upto_token)

@defer.inlineCallbacks
def pushers(self, writer, current_token, limit, request_streams):
current_position = current_token.pushers

pushers = request_streams.get("pushers")

if pushers is not None:
if pushers is not None and pushers != current_position:
updated, deleted = yield self.store.get_all_updated_pushers(
pushers, current_position, limit
)
upto_token = _position_from_rows(updated, current_position)
writer.write_header_and_rows("pushers", updated, (
"position", "user_id", "access_token", "profile_tag", "kind",
"app_id", "app_display_name", "device_display_name", "pushkey",
"ts", "lang", "data"
))
), position=upto_token)
writer.write_header_and_rows("deleted_pushers", deleted, (
"position", "user_id", "app_id", "pushkey"
))
), position=upto_token)

@defer.inlineCallbacks
def caches(self, writer, current_token, limit, request_streams):
current_position = current_token.caches

caches = request_streams.get("caches")

if caches is not None:
if caches is not None and caches != current_position:
updated_caches = yield self.store.get_all_updated_caches(
caches, current_position, limit
)
upto_token = _position_from_rows(updated_caches, current_position)
writer.write_header_and_rows("caches", updated_caches, (
"position", "cache_func", "keys", "invalidation_ts"
))
), position=upto_token)

@defer.inlineCallbacks
def to_device(self, writer, current_token, limit, request_streams):
current_position = current_token.to_device

to_device = request_streams.get("to_device")

if to_device is not None:
if to_device is not None and to_device != current_position:
to_device_rows = yield self.store.get_all_new_device_messages(
to_device, current_position, limit
)
upto_token = _position_from_rows(to_device_rows, current_position)
writer.write_header_and_rows("to_device", to_device_rows, (
"position", "user_id", "device_id", "message_json"
))
), position=upto_token)

@defer.inlineCallbacks
def public_rooms(self, writer, current_token, limit, request_streams):
current_position = current_token.public_rooms

public_rooms = request_streams.get("public_rooms")

if public_rooms is not None:
if public_rooms is not None and public_rooms != current_position:
public_rooms_rows = yield self.store.get_all_new_public_rooms(
public_rooms, current_position, limit
)
upto_token = _position_from_rows(public_rooms_rows, current_position)
writer.write_header_and_rows("public_rooms", public_rooms_rows, (
"position", "room_id", "visibility"
))
), position=upto_token)


class _Writer(object):
Expand All @@ -426,11 +473,11 @@ def __init__(self):
self.total = 0

def write_header_and_rows(self, name, rows, fields, position=None):
if not rows:
return

if position is None:
position = rows[-1][0]
if rows:
position = rows[-1][0]
else:
return

self.streams[name] = {
"position": position if type(position) is int else str(position),
Expand All @@ -440,6 +487,9 @@ def write_header_and_rows(self, name, rows, fields, position=None):

self.total += len(rows)

def __nonzero__(self):
return bool(self.total)

def finish(self):
return self.streams

Expand All @@ -461,3 +511,20 @@ def __new__(cls, *args):

def __str__(self):
return "_".join(str(value) for value in self)


def _position_from_rows(rows, current_position):
"""Calculates a position to return for a stream. Ideally we want to return the
position of the last row, as that will be the most correct. However, if there
are no rows we fall back to using the current position to stop us from
repeatedly hitting the storage layer unncessarily thinking there are updates.
(Not all advances of the token correspond to an actual update)
We can't just always return the current position, as we often limit the
number of rows we replicate, and so the stream may lag. The assumption is
that if the storage layer returns no new rows then we are not lagging and
we are at the `current_position`.
"""
if rows:
return rows[-1][0]
return current_position
3 changes: 3 additions & 0 deletions synapse/storage/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,9 @@ def get_all_new_public_rooms(txn):
txn.execute(sql, (prev_id, current_id, limit,))
return txn.fetchall()

if prev_id == current_id:
return defer.succeed([])

return self.runInteraction(
"get_all_new_public_rooms", get_all_new_public_rooms
)
3 changes: 2 additions & 1 deletion tests/replication/slave/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ def setUp(self):
@defer.inlineCallbacks
def replicate(self):
streams = self.slaved_store.stream_positions()
result = yield self.replication.replicate(streams, 100)
writer = yield self.replication.replicate(streams, 100)
result = writer.finish()
yield self.slaved_store.process_replication(result)

@defer.inlineCallbacks
Expand Down
Loading

0 comments on commit 748d8fd

Please sign in to comment.