Skip to content

Commit

Permalink
Add instance name to RDATA/POSITION commands (matrix-org#7364)
Browse files Browse the repository at this point in the history
This is primarily for allowing us to send those commands from workers, but for now simply allows us to ignore echoed RDATA/POSITION commands that we sent (we get echoes of sent commands when using redis). Currently we log a WARNING on the master process every time we receive an echoed RDATA.
  • Loading branch information
erikjohnston authored Apr 29, 2020
1 parent 3eab76a commit 37f6823
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 50 deletions.
1 change: 1 addition & 0 deletions changelog.d/7364.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add an `instance_name` to `RDATA` and `POSITION` replication commands.
41 changes: 24 additions & 17 deletions docs/tcp_replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@ example flow would be (where '>' indicates master to worker and

> SERVER example.com
< REPLICATE
> POSITION events 53
> RDATA events 54 ["$foo1:bar.com", ...]
> RDATA events 55 ["$foo4:bar.com", ...]
> POSITION events master 53
> RDATA events master 54 ["$foo1:bar.com", ...]
> RDATA events master 55 ["$foo4:bar.com", ...]

The example shows the server accepting a new connection and sending its identity
with the `SERVER` command, followed by the client server to respond with the
position of all streams. The server then periodically sends `RDATA` commands
which have the format `RDATA <stream_name> <token> <row>`, where the format of
`<row>` is defined by the individual streams.
which have the format `RDATA <stream_name> <instance_name> <token> <row>`, where
the format of `<row>` is defined by the individual streams. The
`<instance_name>` is the name of the Synapse process that generated the data
(usually "master").

Error reporting happens by either the client or server sending an ERROR
command, and usually the connection will be closed.
Expand Down Expand Up @@ -52,7 +54,7 @@ The basic structure of the protocol is line based, where the initial
word of each line specifies the command. The rest of the line is parsed
based on the command. For example, the RDATA command is defined as:

RDATA <stream_name> <token> <row_json>
RDATA <stream_name> <instance_name> <token> <row_json>

(Note that <row_json> may contains spaces, but cannot contain
newlines.)
Expand Down Expand Up @@ -136,11 +138,11 @@ the wire:
< NAME synapse.app.appservice
< PING 1490197665618
< REPLICATE
> POSITION events 1
> POSITION backfill 1
> POSITION caches 1
> RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
> RDATA events 14 ["$149019767112vOHxz:localhost:8823",
> POSITION events master 1
> POSITION backfill master 1
> POSITION caches master 1
> RDATA caches master 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513]
> RDATA events master 14 ["$149019767112vOHxz:localhost:8823",
"!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]
< PING 1490197675618
> ERROR server stopping
Expand All @@ -151,10 +153,10 @@ position without needing to send data with the `RDATA` command.

An example of a batched set of `RDATA` is:

> RDATA caches batch ["get_user_by_id",["@test:localhost:8823"],1490197670513]
> RDATA caches batch ["get_user_by_id",["@test2:localhost:8823"],1490197670513]
> RDATA caches batch ["get_user_by_id",["@test3:localhost:8823"],1490197670513]
> RDATA caches 54 ["get_user_by_id",["@test4:localhost:8823"],1490197670513]
> RDATA caches master batch ["get_user_by_id",["@test:localhost:8823"],1490197670513]
> RDATA caches master batch ["get_user_by_id",["@test2:localhost:8823"],1490197670513]
> RDATA caches master batch ["get_user_by_id",["@test3:localhost:8823"],1490197670513]
> RDATA caches master 54 ["get_user_by_id",["@test4:localhost:8823"],1490197670513]

In this case the client shouldn't advance their caches token until it
sees the the last `RDATA`.
Expand All @@ -178,6 +180,11 @@ client (C):
updates, and if so then fetch them out of band. Sent in response to a
REPLICATE command (but can happen at any time).

The POSITION command includes the source of the stream. Currently all streams
are written by a single process (usually "master"). If fetching missing
updates via HTTP API, rather than via the DB, then processes should make the
request to the appropriate process.

#### ERROR (S, C)

There was an error
Expand Down Expand Up @@ -234,12 +241,12 @@ Each individual cache invalidation results in a row being sent down
replication, which includes the cache name (the name of the function)
and they key to invalidate. For example:

> RDATA caches 550953771 ["get_user_by_id", ["@bob:example.com"], 1550574873251]
> RDATA caches master 550953771 ["get_user_by_id", ["@bob:example.com"], 1550574873251]

Alternatively, an entire cache can be invalidated by sending down a `null`
instead of the key. For example:

> RDATA caches 550953772 ["get_user_by_id", null, 1550574873252]
> RDATA caches master 550953772 ["get_user_by_id", null, 1550574873252]

However, there are times when a number of caches need to be invalidated
at the same time with the same key. To reduce traffic we batch those
Expand Down
4 changes: 2 additions & 2 deletions synapse/app/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ def handle_sighup(*args, **kwargs):

# Start the tracer
synapse.logging.opentracing.init_tracer( # type: ignore[attr-defined] # noqa
hs.config
hs
)

# It is now safe to start your Synapse.
Expand Down Expand Up @@ -316,7 +316,7 @@ def setup_sentry(hs):
scope.set_tag("matrix_server_name", hs.config.server_name)

app = hs.config.worker_app if hs.config.worker_app else "synapse.app.homeserver"
name = hs.config.worker_name if hs.config.worker_name else "master"
name = hs.get_instance_name()
scope.set_tag("worker_app", app)
scope.set_tag("worker_name", name)

Expand Down
23 changes: 10 additions & 13 deletions synapse/logging/opentracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,17 @@ def set_fates(clotho, lachesis, atropos, father="Zues", mother="Themis"):
import re
import types
from functools import wraps
from typing import Dict
from typing import TYPE_CHECKING, Dict

from canonicaljson import json

from twisted.internet import defer

from synapse.config import ConfigError

if TYPE_CHECKING:
from synapse.server import HomeServer

# Helper class


Expand Down Expand Up @@ -297,14 +300,11 @@ def _noop_context_manager(*args, **kwargs):
# Setup


def init_tracer(config):
def init_tracer(hs: "HomeServer"):
"""Set the whitelists and initialise the JaegerClient tracer
Args:
config (HomeserverConfig): The config used by the homeserver
"""
global opentracing
if not config.opentracer_enabled:
if not hs.config.opentracer_enabled:
# We don't have a tracer
opentracing = None
return
Expand All @@ -315,18 +315,15 @@ def init_tracer(config):
"installed."
)

# Include the worker name
name = config.worker_name if config.worker_name else "master"

# Pull out the jaeger config if it was given. Otherwise set it to something sensible.
# See https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/config.py

set_homeserver_whitelist(config.opentracer_whitelist)
set_homeserver_whitelist(hs.config.opentracer_whitelist)

JaegerConfig(
config=config.jaeger_config,
service_name="{} {}".format(config.server_name, name),
scope_manager=LogContextScopeManager(config),
config=hs.config.jaeger_config,
service_name="{} {}".format(hs.config.server_name, hs.get_instance_name()),
scope_manager=LogContextScopeManager(hs.config),
).initialize_tracer()


Expand Down
37 changes: 26 additions & 11 deletions synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class RdataCommand(Command):
Format::
RDATA <stream_name> <token> <row_json>
RDATA <stream_name> <instance_name> <token> <row_json>
The `<token>` may either be a numeric stream id OR "batch". The latter case
is used to support sending multiple updates with the same stream ID. This
Expand All @@ -105,33 +105,40 @@ class RdataCommand(Command):
The client should batch all incoming RDATA with a token of "batch" (per
stream_name) until it sees an RDATA with a numeric stream ID.
The `<instance_name>` is the source of the new data (usually "master").
`<token>` of "batch" maps to the instance variable `token` being None.
An example of a batched series of RDATA::
RDATA presence batch ["@foo:example.com", "online", ...]
RDATA presence batch ["@bar:example.com", "online", ...]
RDATA presence 59 ["@baz:example.com", "online", ...]
RDATA presence master batch ["@foo:example.com", "online", ...]
RDATA presence master batch ["@bar:example.com", "online", ...]
RDATA presence master 59 ["@baz:example.com", "online", ...]
"""

NAME = "RDATA"

def __init__(self, stream_name, token, row):
def __init__(self, stream_name, instance_name, token, row):
self.stream_name = stream_name
self.instance_name = instance_name
self.token = token
self.row = row

@classmethod
def from_line(cls, line):
stream_name, token, row_json = line.split(" ", 2)
stream_name, instance_name, token, row_json = line.split(" ", 3)
return cls(
stream_name, None if token == "batch" else int(token), json.loads(row_json)
stream_name,
instance_name,
None if token == "batch" else int(token),
json.loads(row_json),
)

def to_line(self):
return " ".join(
(
self.stream_name,
self.instance_name,
str(self.token) if self.token is not None else "batch",
_json_encoder.encode(self.row),
)
Expand All @@ -145,23 +152,31 @@ class PositionCommand(Command):
"""Sent by the server to tell the client the stream postition without
needing to send an RDATA.
Format::
POSITION <stream_name> <instance_name> <token>
On receipt of a POSITION command clients should check if they have missed
any updates, and if so then fetch them out of band.
The `<instance_name>` is the process that sent the command and is the source
of the stream.
"""

NAME = "POSITION"

def __init__(self, stream_name, token):
def __init__(self, stream_name, instance_name, token):
self.stream_name = stream_name
self.instance_name = instance_name
self.token = token

@classmethod
def from_line(cls, line):
stream_name, token = line.split(" ", 1)
return cls(stream_name, int(token))
stream_name, instance_name, token = line.split(" ", 2)
return cls(stream_name, instance_name, int(token))

def to_line(self):
return " ".join((self.stream_name, str(self.token)))
return " ".join((self.stream_name, self.instance_name, str(self.token)))


class ErrorCommand(_SimpleCommand):
Expand Down
17 changes: 14 additions & 3 deletions synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def __init__(self, hs):
self._notifier = hs.get_notifier()
self._clock = hs.get_clock()
self._instance_id = hs.get_instance_id()
self._instance_name = hs.get_instance_name()

# Set of streams that we've caught up with.
self._streams_connected = set() # type: Set[str]
Expand Down Expand Up @@ -156,7 +157,7 @@ def start_replication(self, hs):
hs.config.redis.redis_host, hs.config.redis.redis_port, self._factory,
)
else:
client_name = hs.config.worker_name
client_name = hs.get_instance_name()
self._factory = DirectTcpReplicationClientFactory(hs, client_name, self)
host = hs.config.worker_replication_host
port = hs.config.worker_replication_port
Expand All @@ -170,7 +171,9 @@ async def on_REPLICATE(self, conn: AbstractConnection, cmd: ReplicateCommand):

for stream_name, stream in self._streams.items():
current_token = stream.current_token()
self.send_command(PositionCommand(stream_name, current_token))
self.send_command(
PositionCommand(stream_name, self._instance_name, current_token)
)

async def on_USER_SYNC(self, conn: AbstractConnection, cmd: UserSyncCommand):
user_sync_counter.inc()
Expand Down Expand Up @@ -235,6 +238,10 @@ async def on_USER_IP(self, conn: AbstractConnection, cmd: UserIpCommand):
await self._server_notices_sender.on_user_ip(cmd.user_id)

async def on_RDATA(self, conn: AbstractConnection, cmd: RdataCommand):
if cmd.instance_name == self._instance_name:
# Ignore RDATA that are just our own echoes
return

stream_name = cmd.stream_name
inbound_rdata_count.labels(stream_name).inc()

Expand Down Expand Up @@ -286,6 +293,10 @@ async def on_rdata(self, stream_name: str, token: int, rows: list):
await self._replication_data_handler.on_rdata(stream_name, token, rows)

async def on_POSITION(self, conn: AbstractConnection, cmd: PositionCommand):
if cmd.instance_name == self._instance_name:
# Ignore POSITION that are just our own echoes
return

stream = self._streams.get(cmd.stream_name)
if not stream:
logger.error("Got POSITION for unknown stream: %s", cmd.stream_name)
Expand Down Expand Up @@ -485,7 +496,7 @@ def stream_update(self, stream_name: str, token: str, data: Any):
We need to check if the client is interested in the stream or not
"""
self.send_command(RdataCommand(stream_name, token, data))
self.send_command(RdataCommand(stream_name, self._instance_name, token, data))


UpdateToken = TypeVar("UpdateToken")
Expand Down
13 changes: 11 additions & 2 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ def __init__(self, hostname: str, config: HomeServerConfig, reactor=None, **kwar
self._listening_services = []
self.start_time = None

self.instance_id = random_string(5)
self._instance_id = random_string(5)
self._instance_name = config.worker_name or "master"

self.clock = Clock(reactor)
self.distributor = Distributor()
Expand All @@ -254,7 +255,15 @@ def get_instance_id(self):
This is used to distinguish running instances in worker-based
deployments.
"""
return self.instance_id
return self._instance_id

def get_instance_name(self) -> str:
"""A unique name for this synapse process.
Used to identify the process over replication and in config. Does not
change over restarts.
"""
return self._instance_name

def setup(self):
logger.info("Setting up.")
Expand Down
2 changes: 2 additions & 0 deletions synapse/server.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ class HomeServer(object):
pass
def get_instance_id(self) -> str:
pass
def get_instance_name(self) -> str:
pass
def get_event_builder_factory(self) -> EventBuilderFactory:
pass
def get_storage(self) -> synapse.storage.Storage:
Expand Down
1 change: 1 addition & 0 deletions tests/replication/slave/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def prepare(self, reactor, clock, hs):
# We now do some gut wrenching so that we have a client that is based
# off of the slave store rather than the main store.
self.replication_handler = ReplicationCommandHandler(self.hs)
self.replication_handler._instance_name = "worker"
self.replication_handler._replication_data_handler = ReplicationDataHandler(
self.slaved_store
)
Expand Down
6 changes: 4 additions & 2 deletions tests/replication/tcp/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@ def test_parse_one_word_command(self):
self.assertIsInstance(cmd, ReplicateCommand)

def test_parse_rdata(self):
line = 'RDATA events 6287863 ["ev", ["$eventid", "!roomid", "type", null, null, null]]'
line = 'RDATA events master 6287863 ["ev", ["$eventid", "!roomid", "type", null, null, null]]'
cmd = parse_command_from_line(line)
self.assertIsInstance(cmd, RdataCommand)
self.assertEqual(cmd.stream_name, "events")
self.assertEqual(cmd.instance_name, "master")
self.assertEqual(cmd.token, 6287863)

def test_parse_rdata_batch(self):
line = 'RDATA presence batch ["@foo:example.com", "online"]'
line = 'RDATA presence master batch ["@foo:example.com", "online"]'
cmd = parse_command_from_line(line)
self.assertIsInstance(cmd, RdataCommand)
self.assertEqual(cmd.stream_name, "presence")
self.assertEqual(cmd.instance_name, "master")
self.assertIsNone(cmd.token)

0 comments on commit 37f6823

Please sign in to comment.