Skip to content

Commit

Permalink
Reintroduce a flag wether a post got indexed or not.
Browse files Browse the repository at this point in the history
The outer join is just too slow (>10s) and causes big problems on the
DB. While the flag is not as nice, it makes the query trivial by using
an index.
  • Loading branch information
ReneHollander committed Apr 27, 2024
1 parent e83da81 commit 1b96a25
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 9 deletions.
19 changes: 12 additions & 7 deletions rep0st/db/post.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ class Post(Base):
deleted = Column(Boolean(), nullable=False, default=False)
# List of features associated with this post.
features = relationship(Feature)
# True if features are indexed for this post.
features_indexed = Column(
Boolean(), nullable=False, index=True, default=False)
# List of tags associated with this post.
tags = relationship(Tag)

Expand Down Expand Up @@ -194,9 +197,9 @@ def __repr__(self):
class PostRepository(Repository[int, Post]):

indices = [
# Index on error_status, type and deleted for fast missing feature lookups.
Index('post_error_status_type_deleted_index', Post.error_status,
Post.type, Post.deleted),
# Index on error_status, type and deleted and features_indexed for fast missing feature lookups.
Index('post_error_status_type_deleted_features_indexed_index',
Post.error_status, Post.type, Post.deleted, Post.features_indexed),
]

@inject
Expand All @@ -220,12 +223,12 @@ def get_posts(self, type: Optional[str] = None) -> Query[Post]:
@transactional()
def get_posts_missing_features(self, type: Optional[Type] = None):
session = self._get_session()
q = session.query(Post).join(Post.features, isouter=True)
q = session.query(Post)
if type:
q = q.filter(Post.type == type)
return q.filter(
and_(Post.error_status == None, Post.deleted == False,
Feature.id == None))
Post.features_indexed == False))

@transactional()
def post_count(self) -> int:
Expand All @@ -235,11 +238,13 @@ def post_count(self) -> int:
@transactional()
def get_latest_post_id_with_features(self) -> int:
session = self._get_session()
id = session.query(func.max(Feature.post_id)).scalar()
id = session.query(func.max(Post.id)).filter(
and_(Post.features_indexed == True)).scalar()
return 0 if id is None else id

@transactional()
def post_count_with_features(self) -> int:
session = self._get_session()
id = session.query(func.count(Feature.post_id)).group_by(Feature.post_id).scalar()
id = session.query(func.count(Post.id)).filter(
and_(Post.features_indexed == True)).scalar()
return 0 if id is None else id
6 changes: 4 additions & 2 deletions rep0st/service/feature_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def add_features_to_posts(self,
feature.type = type
feature.data = data
work_post.post.features.append(feature)
work_post.post.features_indexed = True

@transactional()
def _process_features(
Expand Down Expand Up @@ -205,8 +206,9 @@ def update_features(self, post_type: PostType):

def backfill_features(self, post_type: PostType):
log.info('Starting feature backfill')
it = self.post_repository.query().join(Post.features).filter(
and_(Post.type == post_type, Post.deleted == False))
it = self.post_repository.query().filter(
and_(Post.type == post_type, Post.deleted == False,
Post.features_indexed == True))
it = it.yield_per(1000)
it = util.iterator_every(
it, every=10000, msg='Backfilled {current} features')
Expand Down
8 changes: 8 additions & 0 deletions rep0st/service/post_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ def _process_batch(self, batch_start_id: int, batch_end_id: int,
f'Marking post deleted since it is no longer in the API: {post_from_db}'
)
post_from_db.deleted = True
# Remove the features from the DB for good measure.
post_from_db.features = []
post_from_db.features_indexed = False
to_save.append(post_from_db)
continue
# Post is in both DB and API. Potentially update it.
Expand All @@ -121,8 +124,13 @@ def _process_batch(self, batch_start_id: int, batch_end_id: int,
f'Updating flags of post since they changed: {post_from_db}. post_from_db.flags={post_from_db.flags}, post_from_api.flags={post_from_api.flags}'
)
post_from_db.flags = post_from_api.flags
old_error_status = post_from_db.error_status
# Download media if not exists or broken.
self._download_media(post_from_db)
if old_error_status != post_from_db.error_status:
# Remove the features. The update feature job will try to index the media again on the next run.
post_from_db.features = []
post_from_db.features_indexed = False
to_save.append(post_from_db)
# TODO(https://github.com/ReneHollander/rep0st/issues/42): Sync updates posts to elasticsearch index.
self.post_repository.persist_all(to_save)
Expand Down

0 comments on commit 1b96a25

Please sign in to comment.