Skip to content

Commit

Permalink
db: Retry on additional failure scenarios
Browse files Browse the repository at this point in the history
There are a number of additional failure scenarios for database queries
we regularly encounter. One is the server closing the connection, e.g.
because it is restarting. The other is the server not being reachable
(yet) because it is still restarting. This change adds handling for both
via the connection_invalidated property of SQLAlchemy statement-related
exceptions and adding a back-off retry for the MySQL-specific connection
failed error code. This makes aggressive connection pool recycling
unnecessary, so we can remove it.

Closes scVENUS#171.
  • Loading branch information
michaelweiser committed Aug 4, 2020
1 parent 1b8976c commit 95bd13d
Showing 1 changed file with 30 additions and 7 deletions.
37 changes: 30 additions & 7 deletions peekaboo/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.engine import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.exc import SQLAlchemyError, IntegrityError, OperationalError
from sqlalchemy.exc import SQLAlchemyError, IntegrityError, OperationalError, \
DBAPIError
from peekaboo import __version__
from peekaboo.ruleset import Result
from peekaboo.exceptions import PeekabooDatabaseError
Expand Down Expand Up @@ -147,7 +148,7 @@ def __init__(self, db_url, instance_id=0,
"""
logging.getLogger('sqlalchemy.engine').setLevel(log_level)

self.__engine = create_engine(db_url, pool_recycle=1)
self.__engine = create_engine(db_url)
session_factory = sessionmaker(bind=self.__engine)
self.__session = scoped_session(session_factory)
self.__lock = threading.RLock()
Expand All @@ -160,6 +161,7 @@ def __init__(self, db_url, instance_id=0,
# attempt 3: 10 * 2**(3) == 40-80msecs
# attempt 4: 10 * 2**(4) == 80-160msecs
self.deadlock_backoff_base = 10
self.connect_backoff_base = 100

Base.metadata.create_all(self.__engine)

Expand All @@ -176,6 +178,11 @@ def was_transient_error(self, error, attempt, action):
if attempt >= self.retries:
return -1

# only DBAPIError has connection_invalidated
if getattr(error, 'connection_invalidated', False):
logger.debug('Connection invalidated %s. Retrying.', action)
return attempt + 1

# Access the original DBAPI exception anonymously.
# We intentionally do some crude duck-typing here to avoid
# imports of otherwise optional RDBMS modules. False-positive
Expand All @@ -187,6 +194,17 @@ def was_transient_error(self, error, attempt, action):

args = error.orig.args

# (MySQLdb._exceptions.OperationalError) (2002, "Can't connect to local
# MySQL server through socket '/var/run/mysqld/mysqld.sock' (2)")
if (isinstance(args, tuple) and len(args) > 0 and args[0] in [2002]):
# sleep some millisecs
maxmsecs = self.connect_backoff_base * 2**attempt
backoff = random.randint(maxmsecs/2, maxmsecs)
logger.debug('Connection failed %s, backing off for %d '
'milliseconds before retrying', action, backoff)
time.sleep(backoff / 1000)
return attempt + 1

# (MySQLdb._exceptions.OperationalError) (1213, 'Deadlock
# found when trying to get lock; try restarting transaction')
if (isinstance(args, tuple) and len(args) > 0 and args[0] in [1213]):
Expand Down Expand Up @@ -220,7 +238,8 @@ def analysis_save(self, sample):
session.add(sample_info)
try:
session.commit()
except (OperationalError, SQLAlchemyError) as error:
except (OperationalError, DBAPIError,
SQLAlchemyError) as error:
session.rollback()

attempt = self.was_transient_error(
Expand Down Expand Up @@ -301,7 +320,8 @@ def mark_sample_in_flight(self, sample, instance_id=None, start_time=None):
session.rollback()
logger.debug('Sample %s is already in flight on another '
'instance', sha256sum)
except (OperationalError, SQLAlchemyError) as error:
except (OperationalError, DBAPIError,
SQLAlchemyError) as error:
session.rollback()

attempt = self.was_transient_error(
Expand Down Expand Up @@ -351,7 +371,8 @@ def clear_sample_in_flight(self, sample, instance_id=None):
# delete() is not queued and goes to the DB before commit()
cleared = query.delete()
session.commit()
except (OperationalError, SQLAlchemyError) as error:
except (OperationalError, DBAPIError,
SQLAlchemyError) as error:
session.rollback()

attempt = self.was_transient_error(
Expand Down Expand Up @@ -418,7 +439,8 @@ def clear_in_flight_samples(self, instance_id=None):
# delete() is not queued and goes to the DB before commit()
query.delete()
session.commit()
except (OperationalError, SQLAlchemyError) as error:
except (OperationalError, DBAPIError,
SQLAlchemyError) as error:
session.rollback()

attempt = self.was_transient_error(
Expand Down Expand Up @@ -473,7 +495,8 @@ def clear_stale_in_flight_samples(self):
if cleared > 0:
logger.warning(
'%d stale in-flight samples cleared.', cleared)
except (OperationalError, SQLAlchemyError) as error:
except (OperationalError, DBAPIError,
SQLAlchemyError) as error:
session.rollback()

attempt = self.was_transient_error(
Expand Down

0 comments on commit 95bd13d

Please sign in to comment.