Skip to content

Commit

Permalink
Add a job to allow backfilling of the elasticsearch index.
Browse files Browse the repository at this point in the history
Also add monitoring for posts in elasticsearch to be able to detect the
index from diverging.
  • Loading branch information
ReneHollander committed Sep 3, 2023
1 parent 9bb8de9 commit 958a233
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 10 deletions.
3 changes: 2 additions & 1 deletion rep0st/db/feature.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from injector import Module, ProviderOf, inject
from sqlalchemy import Column, ForeignKey, Integer, LargeBinary, String
from sqlalchemy import Column, ForeignKey, Integer, LargeBinary, String, func
from sqlalchemy.orm import Session, relationship

from rep0st.config.rep0st_database import Rep0stDatabaseModule
from rep0st.db import Base
from rep0st.framework.data.repository import CompoundKey, Repository
from rep0st.framework.data.transaction import transactional


class FeatureRepositoryModule(Module):
Expand Down
14 changes: 14 additions & 0 deletions rep0st/db/post.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,17 @@ def get_posts_missing_features(self, type: Optional[Type] = None):
def post_count(self) -> int:
session = self._get_session()
return session.query(func.count(Post.id)).scalar()

@transactional()
def get_latest_post_id_with_features(self) -> int:
session = self._get_session()
id = session.query(func.max(Post.id)).filter(
and_(Post.status == Status.INDEXED, Post.type == Type.IMAGE)).scalar()
return 0 if id is None else id

@transactional()
def post_count_with_features(self) -> int:
session = self._get_session()
id = session.query(func.count(Post.id)).filter(
and_(Post.status == Status.INDEXED, Post.type == Type.IMAGE)).scalar()
return 0 if id is None else id
11 changes: 9 additions & 2 deletions rep0st/framework/data/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Generic, Iterable, List, Type, TypeVar

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from elasticsearch.helpers import bulk, BulkIndexError
from elasticsearch_dsl import Float, Search
from elasticsearch_dsl.query import Query
from injector import Module, provider, singleton
Expand Down Expand Up @@ -66,10 +66,17 @@ def _it():
for value in values:
yield value.to_dict(include_meta=True)

return bulk(self.elasticsearch, _it())
try:
return bulk(self.elasticsearch, _it())
except BulkIndexError as e:
raise BulkIndexError(
f"{len(e.errors)} document(s) failed to index: {e.errors}", e.errors)

def search(self) -> Search:
return self._k_type.search(using=self.elasticsearch)

def count(self) -> Search:
return self.search().count()

def delete(self, value: K):
return value.delete(using=self.elasticsearch)
10 changes: 9 additions & 1 deletion rep0st/index/post.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Iterable, NamedTuple

from elasticsearch import Elasticsearch
from elasticsearch_dsl import Date, Document, InnerDoc, Integer, Keyword, Nested
from elasticsearch_dsl import Date, Document, InnerDoc, Integer, Keyword, Nested, Search
from injector import Module, inject

from rep0st.analyze.feature_vector_analyzer import TYPE_NAME as FEATURE_VECTOR_TYPE
Expand All @@ -23,6 +23,7 @@ class Frame(InnerDoc):


class Post(Document):
id = Integer()
created = Date()
flags = Keyword()
type = Keyword()
Expand Down Expand Up @@ -55,6 +56,7 @@ def __init__(self, elasticsearch: Elasticsearch):
def _index_post_from_post(self, post: DBPost) -> Post:
index_post = Post()
index_post.meta.id = post.id
index_post.id = post.id
index_post.created = post.created
index_post.type = post.type.value
index_post.flags = [flag.value for flag in post.get_flags()]
Expand Down Expand Up @@ -104,3 +106,9 @@ def find_posts(self, feature_vector):

for post in response:
yield SearchResult(post.meta.score, post.meta.id)

def get_post_with_highest_id(self):
s = Search(using=self.elasticsearch).extra(size=0)
s.aggs.metric('max_post_id', 'max', field='id')
res = s.execute()
return res.aggregations['max_post_id']['value']
45 changes: 45 additions & 0 deletions rep0st/job/backfill_features_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import logging
from typing import Any, List

from absl import flags
from injector import Binder, Module, inject, singleton

from rep0st.framework import app
from rep0st.framework.execute import execute
from rep0st.service.feature_service import FeatureService, FeatureServiceModule
from rep0st.db.post import Type as PostType

log = logging.getLogger(__name__)
FLAGS = flags.FLAGS
flags.DEFINE_enum_class(
'rep0st_backfill_features_post_type', PostType.IMAGE, PostType,
'The post type (image, video, ...) this job should index.')


class BackfillFeaturesJobModule(Module):

def configure(self, binder: Binder):
binder.install(FeatureServiceModule)
binder.bind(BackfillFeaturesJob)


@singleton
class BackfillFeaturesJob:
feature_service: FeatureService

@inject
def __init__(self, feature_service: FeatureService):
self.feature_service = feature_service

@execute()
def backfill_feature_job(self):
self.feature_service.backfill_features(
FLAGS.rep0st_backfill_features_post_type)


def modules() -> List[Any]:
return [BackfillFeaturesJobModule]


if __name__ == "__main__":
app.run(modules)
28 changes: 24 additions & 4 deletions rep0st/service/feature_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,22 @@

feature_service_features_added_z = Counter(
'rep0st_feature_service_features_added',
'Number of features added to the index')
'Number of features added to the index.')
feature_service_latest_processed_post_z = Gauge(
'rep0st_feature_service_latest_processed_post',
'ID of the latest post where the feaures where processed')
'ID of the latest post where the features where processed.')
feature_service_latest_post_with_features_in_database_z = Gauge(
'rep0st_feature_service_latest_post_with_features_in_database',
'ID of the latest post in the database.')
feature_service_latest_post_with_features_in_index_z = Gauge(
'rep0st_feature_service_latest_post_with_features_in_index',
'ID of the latest post in the Elasticsearch index.')
feature_service_post_count_with_features_in_database_z = Gauge(
'rep0st_feature_service_post_count_with_features_in_database',
'Number of posts with features in the database.')
feature_service_post_count_with_features_in_index_z = Gauge(
'rep0st_feature_service_post_count_with_features_in_index',
'Number of posts with features in the Elasticsearch index.')


class FeatureServiceModule(Module):
Expand Down Expand Up @@ -83,6 +95,14 @@ def __init__(self, read_media_service: ReadMediaService,
self.feature_repository = feature_repository
self.analyze_service = analyze_service
self.post_index = post_index
feature_service_latest_post_with_features_in_database_z.set_function(
self.post_repository.get_latest_post_id_with_features)
feature_service_latest_post_with_features_in_index_z.set_function(
self.post_index.get_post_with_highest_id)
feature_service_post_count_with_features_in_database_z.set_function(
self.post_repository.post_count_with_features)
feature_service_post_count_with_features_in_index_z.set_function(
self.post_index.count)

def _process_work_post(self, work_post: WorkPost) -> WorkPost:
try:
Expand Down Expand Up @@ -170,10 +190,10 @@ def update_features(self, post_type: PostType):
f'Finished updating features. {feature_counter} features for {post_counter} posts were added to the database'
)

def backfill_features(self):
def backfill_features(self, post_type: PostType):
log.info('Starting feature backfill')
it = self.post_repository.query().filter(
and_(Post.status == Status.INDEXED, Post.type == PostType.IMAGE,
and_(Post.status == Status.INDEXED, Post.type == post_type,
Post.deleted == False))
it = it.yield_per(1000)
it = util.iterator_every(
Expand Down
4 changes: 2 additions & 2 deletions rep0st/service/post_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
post_service_posts_added_z = Counter('rep0st_post_service_posts_added',
'Number of posts added to database')
post_service_latest_post_id_z = Gauge(
'rep0st_post_service_latest_processed_post', 'ID of the latest post seen')
'rep0st_post_service_latest_processed_post', 'ID of the latest post seen.')
post_service_latest_post_in_database_z = Gauge(
'rep0st_post_service_latest_post_in_database',
'ID of the latest post in the database')
'ID of the latest post in the database.')


class PostServiceModule(Module):
Expand Down

0 comments on commit 958a233

Please sign in to comment.