Skip to content

Commit

Permalink
Merge pull request matrix-org#1725 from matrix-org/erikj/timeout_conn
Browse files Browse the repository at this point in the history
Wrap connections in an N minute timeout to ensure they get reaped correctly
  • Loading branch information
NegativeMjark authored Dec 29, 2016
2 parents b9b6d17 + 97ffc56 commit 828c585
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 7 deletions.
70 changes: 66 additions & 4 deletions synapse/http/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.

from twisted.internet.endpoints import SSL4ClientEndpoint, TCP4ClientEndpoint
from twisted.internet import defer
from twisted.internet import defer, reactor
from twisted.internet.error import ConnectError
from twisted.names import client, dns
from twisted.names.error import DNSNameError, DomainError
Expand Down Expand Up @@ -66,13 +66,75 @@ def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None,
default_port = 8448

if port is None:
return SRVClientEndpoint(
return _WrappingEndpointFac(SRVClientEndpoint(
reactor, "matrix", domain, protocol="tcp",
default_port=default_port, endpoint=transport_endpoint,
endpoint_kw_args=endpoint_kw_args
)
))
else:
return transport_endpoint(reactor, domain, port, **endpoint_kw_args)
return _WrappingEndpointFac(transport_endpoint(
reactor, domain, port, **endpoint_kw_args
))


class _WrappingEndpointFac(object):
def __init__(self, endpoint_fac):
self.endpoint_fac = endpoint_fac

@defer.inlineCallbacks
def connect(self, protocolFactory):
conn = yield self.endpoint_fac.connect(protocolFactory)
conn = _WrappedConnection(conn)
defer.returnValue(conn)


class _WrappedConnection(object):
"""Wraps a connection and calls abort on it if it hasn't seen any action
for 2.5-3 minutes.
"""
__slots__ = ["conn", "last_request"]

def __init__(self, conn):
object.__setattr__(self, "conn", conn)
object.__setattr__(self, "last_request", time.time())

def __getattr__(self, name):
return getattr(self.conn, name)

def __setattr__(self, name, value):
setattr(self.conn, name, value)

def _time_things_out_maybe(self):
# We use a slightly shorter timeout here just in case the callLater is
# triggered early. Paranoia ftw.
# TODO: Cancel the previous callLater rather than comparing time.time()?
if time.time() - self.last_request >= 2.5 * 60:
self.abort()
# Abort the underlying TLS connection. The abort() method calls
# loseConnection() on the underlying TLS connection which tries to
# shutdown the connection cleanly. We call abortConnection()
# since that will promptly close the underlying TCP connection.
self.transport.abortConnection()

def request(self, request):
self.last_request = time.time()

# Time this connection out if we haven't send a request in the last
# N minutes
# TODO: Cancel the previous callLater?
reactor.callLater(3 * 60, self._time_things_out_maybe)

d = self.conn.request(request)

def update_request_time(res):
self.last_request = time.time()
# TODO: Cancel the previous callLater?
reactor.callLater(3 * 60, self._time_things_out_maybe)
return res

d.addCallback(update_request_time)

return d


class SpiderEndpoint(object):
Expand Down
7 changes: 4 additions & 3 deletions synapse/http/matrixfederationclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ def __init__(self, hs):
self.signing_key = hs.config.signing_key[0]
self.server_name = hs.hostname
pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 10
pool.maxPersistentPerHost = 5
pool.cachedConnectionTimeout = 2 * 60
self.agent = Agent.usingEndpointFactory(
reactor, MatrixFederationEndpointFactory(hs), pool=pool
)
Expand Down Expand Up @@ -299,7 +300,7 @@ def body_callback(method, url_bytes, headers_dict):
defer.returnValue(json.loads(body))

@defer.inlineCallbacks
def post_json(self, destination, path, data={}, long_retries=True,
def post_json(self, destination, path, data={}, long_retries=False,
timeout=None):
""" Sends the specifed json data using POST
Expand Down Expand Up @@ -332,7 +333,7 @@ def body_callback(method, url_bytes, headers_dict):
path.encode("ascii"),
body_callback=body_callback,
headers_dict={"Content-Type": ["application/json"]},
long_retries=True,
long_retries=long_retries,
timeout=timeout,
)

Expand Down

0 comments on commit 828c585

Please sign in to comment.