Skip to content

Commit

Permalink
Merge pull request matrix-org#5589 from matrix-org/erikj/admin_exfilt…
Browse files Browse the repository at this point in the history
…rate_data

Add basic function to get all data for a user out of synapse
  • Loading branch information
erikjohnston authored Jul 15, 2019
2 parents d336b51 + db0a50b commit d863213
Show file tree
Hide file tree
Showing 6 changed files with 423 additions and 7 deletions.
1 change: 1 addition & 0 deletions changelog.d/5589.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add ability to pull all locally stored events out of synapse that a particular user can see.
183 changes: 183 additions & 0 deletions synapse/handlers/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

from twisted.internet import defer

from synapse.api.constants import Membership
from synapse.types import RoomStreamToken
from synapse.visibility import filter_events_for_client

from ._base import BaseHandler

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -89,3 +93,182 @@ def search_users(self, term):
ret = yield self.store.search_users(term)

defer.returnValue(ret)

@defer.inlineCallbacks
def export_user_data(self, user_id, writer):
"""Write all data we have on the user to the given writer.
Args:
user_id (str)
writer (ExfiltrationWriter)
Returns:
defer.Deferred: Resolves when all data for a user has been written.
The returned value is that returned by `writer.finished()`.
"""
# Get all rooms the user is in or has been in
rooms = yield self.store.get_rooms_for_user_where_membership_is(
user_id,
membership_list=(
Membership.JOIN,
Membership.LEAVE,
Membership.BAN,
Membership.INVITE,
),
)

# We only try and fetch events for rooms the user has been in. If
# they've been e.g. invited to a room without joining then we handle
# those seperately.
rooms_user_has_been_in = yield self.store.get_rooms_user_has_been_in(user_id)

for index, room in enumerate(rooms):
room_id = room.room_id

logger.info(
"[%s] Handling room %s, %d/%d", user_id, room_id, index + 1, len(rooms)
)

forgotten = yield self.store.did_forget(user_id, room_id)
if forgotten:
logger.info("[%s] User forgot room %d, ignoring", user_id, room_id)
continue

if room_id not in rooms_user_has_been_in:
# If we haven't been in the rooms then the filtering code below
# won't return anything, so we need to handle these cases
# explicitly.

if room.membership == Membership.INVITE:
event_id = room.event_id
invite = yield self.store.get_event(event_id, allow_none=True)
if invite:
invited_state = invite.unsigned["invite_room_state"]
writer.write_invite(room_id, invite, invited_state)

continue

# We only want to bother fetching events up to the last time they
# were joined. We estimate that point by looking at the
# stream_ordering of the last membership if it wasn't a join.
if room.membership == Membership.JOIN:
stream_ordering = yield self.store.get_room_max_stream_ordering()
else:
stream_ordering = room.stream_ordering

from_key = str(RoomStreamToken(0, 0))
to_key = str(RoomStreamToken(None, stream_ordering))

written_events = set() # Events that we've processed in this room

# We need to track gaps in the events stream so that we can then
# write out the state at those events. We do this by keeping track
# of events whose prev events we haven't seen.

# Map from event ID to prev events that haven't been processed,
# dict[str, set[str]].
event_to_unseen_prevs = {}

# The reverse mapping to above, i.e. map from unseen event to events
# that have the unseen event in their prev_events, i.e. the unseen
# events "children". dict[str, set[str]]
unseen_to_child_events = {}

# We fetch events in the room the user could see by fetching *all*
# events that we have and then filtering, this isn't the most
# efficient method perhaps but it does guarantee we get everything.
while True:
events, _ = yield self.store.paginate_room_events(
room_id, from_key, to_key, limit=100, direction="f"
)
if not events:
break

from_key = events[-1].internal_metadata.after

events = yield filter_events_for_client(self.store, user_id, events)

writer.write_events(room_id, events)

# Update the extremity tracking dicts
for event in events:
# Check if we have any prev events that haven't been
# processed yet, and add those to the appropriate dicts.
unseen_events = set(event.prev_event_ids()) - written_events
if unseen_events:
event_to_unseen_prevs[event.event_id] = unseen_events
for unseen in unseen_events:
unseen_to_child_events.setdefault(unseen, set()).add(
event.event_id
)

# Now check if this event is an unseen prev event, if so
# then we remove this event from the appropriate dicts.
for child_id in unseen_to_child_events.pop(event.event_id, []):
event_to_unseen_prevs[child_id].discard(event.event_id)

written_events.add(event.event_id)

logger.info(
"Written %d events in room %s", len(written_events), room_id
)

# Extremities are the events who have at least one unseen prev event.
extremities = (
event_id
for event_id, unseen_prevs in event_to_unseen_prevs.items()
if unseen_prevs
)
for event_id in extremities:
if not event_to_unseen_prevs[event_id]:
continue
state = yield self.store.get_state_for_event(event_id)
writer.write_state(room_id, event_id, state)

defer.returnValue(writer.finished())


class ExfiltrationWriter(object):
"""Interface used to specify how to write exported data.
"""

def write_events(self, room_id, events):
"""Write a batch of events for a room.
Args:
room_id (str)
events (list[FrozenEvent])
"""
pass

def write_state(self, room_id, event_id, state):
"""Write the state at the given event in the room.
This only gets called for backward extremities rather than for each
event.
Args:
room_id (str)
event_id (str)
state (dict[tuple[str, str], FrozenEvent])
"""
pass

def write_invite(self, room_id, event, state):
"""Write an invite for the room, with associated invite state.
Args:
room_id (str)
event (FrozenEvent)
state (dict[tuple[str, str], dict]): A subset of the state at the
invite, with a subset of the event keys (type, state_key
content and sender)
"""

def finished(self):
"""Called when all data has succesfully been exported and written.
This functions return value is passed to the caller of
`export_user_data`.
"""
pass
20 changes: 20 additions & 0 deletions synapse/storage/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,26 @@ def f(txn):
count = yield self.runInteraction("did_forget_membership", f)
defer.returnValue(count == 0)

@defer.inlineCallbacks
def get_rooms_user_has_been_in(self, user_id):
"""Get all rooms that the user has ever been in.
Args:
user_id (str)
Returns:
Deferred[set[str]]: Set of room IDs.
"""

room_ids = yield self._simple_select_onecol(
table="room_memberships",
keyvalues={"membership": Membership.JOIN, "user_id": user_id},
retcol="room_id",
desc="get_rooms_user_has_been_in",
)

return set(room_ids)


class RoomMemberStore(RoomMemberWorkerStore):
def __init__(self, db_conn, hs):
Expand Down
14 changes: 8 additions & 6 deletions synapse/storage/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,9 @@ def _paginate_room_events_txn(
Returns:
Deferred[tuple[list[_EventDictReturn], str]]: Returns the results
as a list of _EventDictReturn and a token that points to the end
of the result set.
of the result set. If no events are returned then the end of the
stream has been reached (i.e. there are no events between
`from_token` and `to_token`), or `limit` is zero.
"""

assert int(limit) >= 0
Expand Down Expand Up @@ -905,15 +907,15 @@ def paginate_room_events(
only those before
direction(char): Either 'b' or 'f' to indicate whether we are
paginating forwards or backwards from `from_key`.
limit (int): The maximum number of events to return. Zero or less
means no limit.
limit (int): The maximum number of events to return.
event_filter (Filter|None): If provided filters the events to
those that match the filter.
Returns:
tuple[list[dict], str]: Returns the results as a list of dicts and
a token that points to the end of the result set. The dicts have
the keys "event_id", "topological_ordering" and "stream_orderign".
tuple[list[FrozenEvent], str]: Returns the results as a list of
events and a token that points to the end of the result set. If no
events are returned then the end of the stream has been reached
(i.e. there are no events between `from_key` and `to_key`).
"""

from_key = RoomStreamToken.parse(from_key)
Expand Down
Loading

0 comments on commit d863213

Please sign in to comment.