-
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* create levels for the poc * improve ci * fix module path * keep flexible the script * fix task file name * fix task file name * fix import
- Loading branch information
Showing
21 changed files
with
334 additions
and
62 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -160,3 +160,4 @@ cython_debug/ | |
#.idea/ | ||
|
||
/data | ||
*.db |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,3 +19,4 @@ dependencies: | |
- vulture | ||
- bandit | ||
- mccabe | ||
- sqlalchemy |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +0,0 @@ | ||
from poc_celery.celery_app import app as celery_app | ||
|
||
__all__ = ("celery_app",) | ||
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
from __future__ import annotations | ||
|
||
from sqlalchemy import Column, Integer, String, create_engine | ||
from sqlalchemy.ext.declarative import declarative_base | ||
from sqlalchemy.orm import sessionmaker | ||
|
||
# Base class for declarative class definitions | ||
Base = declarative_base() | ||
|
||
|
||
class SearchModel(Base): | ||
__tablename__ = "search" | ||
id = Column(Integer, primary_key=True) | ||
query = Column(String, nullable=False) | ||
|
||
|
||
class ArticleModel(Base): | ||
__tablename__ = "article" | ||
id = Column(Integer, primary_key=True) | ||
search_id = Column(Integer, nullable=False) | ||
meta = Column(String, nullable=True) | ||
|
||
|
||
class SimpleORM: | ||
context = {"session": None} | ||
|
||
@classmethod | ||
def create(cls, **kwargs) -> SimpleORM: | ||
""" | ||
Inserts sample data into the database. | ||
Parameters: | ||
session (Session): The SQLAlchemy session object. | ||
""" | ||
# Creating a new search record | ||
new_object = cls.model(**kwargs) | ||
cls.context["session"].add(new_object) | ||
cls.context["session"].commit() | ||
return new_object | ||
|
||
@classmethod | ||
def filter(cls, **kwargs) -> SimpleORM: | ||
# Filtering data based on a condition | ||
query = cls.context["session"].query(cls.model) | ||
|
||
# Apply filters based on kwargs | ||
for key, value in kwargs.items(): | ||
if not hasattr(cls.model, key): | ||
print(f"Warning: '{key}' is not a valid attribute of Article") | ||
continue | ||
|
||
# Construct a filter using the 'like' operator if the value | ||
# contains a wildcard character | ||
if "%" in value: | ||
query = query.filter(getattr(cls.model, key).like(value)) | ||
else: | ||
query = query.filter(getattr(cls.model, key) == value) | ||
|
||
return query.all() | ||
|
||
@classmethod | ||
def setup(cls, url: str = "sqlite:///example.db"): | ||
""" | ||
Setup the database by creating tables and initializing the session. | ||
Parameters: | ||
url (str): The database URL. | ||
Returns: | ||
session (Session): A SQLAlchemy Session object. | ||
""" | ||
engine = create_engine( | ||
url, echo=False | ||
) # Set echo=False to turn off verbose logging | ||
Base.metadata.create_all(engine) # Create all tables | ||
Session = sessionmaker(bind=engine) | ||
cls.context["session"] = Session() | ||
cls.reset() | ||
return cls.context["session"] | ||
|
||
@classmethod | ||
def reset(cls): | ||
""" | ||
Resets the database by dropping all tables and recreating them. | ||
""" | ||
# Get the engine from the current session | ||
engine = cls.context["session"].get_bind() | ||
# Drop all tables | ||
Base.metadata.drop_all(engine) | ||
# Create all tables | ||
Base.metadata.create_all(engine) | ||
print("Database has been reset.") | ||
|
||
|
||
class Search(SimpleORM): | ||
model = SearchModel | ||
|
||
|
||
class Article(SimpleORM): | ||
model = ArticleModel |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
from poc_celery.poc1.celery_app import app as celery_app | ||
|
||
__all__ = ("celery_app",) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
from celery import Celery | ||
|
||
from poc_celery.get_container_ip import get_amqp_ip, get_redis_ip | ||
|
||
# Get the Rabbitmq container IP address | ||
AMQP_IP = get_amqp_ip() | ||
REDIS_IP = get_redis_ip() | ||
|
||
# Create a Celery instance with Rabbitmq as the broker and result backend | ||
app = Celery( | ||
"poc-celery", | ||
broker=f"amqp://guest:guest@{AMQP_IP}:5672", | ||
backend=f"redis://{REDIS_IP}:6379/0", | ||
include=[ | ||
"poc_celery.poc1.tasks_async", | ||
"poc_celery.poc1.tasks_collectors", | ||
], | ||
) | ||
|
||
# Set broker_connection_retry_on_startup to True to suppress the warning | ||
app.conf.broker_connection_retry_on_startup = True | ||
|
||
app.autodiscover_tasks() |
2 changes: 1 addition & 1 deletion
2
src/poc_celery/tasks_async.py → src/poc_celery/poc1/tasks_async.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
from poc_celery.poc2.celery_app import app as celery_app | ||
|
||
__all__ = ("celery_app",) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
from __future__ import annotations | ||
|
||
from celery import group, shared_task | ||
|
||
from poc_celery.db import Article, Search, SimpleORM | ||
|
||
SimpleORM.setup() | ||
|
||
|
||
@shared_task | ||
def search_task(query: str): | ||
""" | ||
Start the pipeline. | ||
Initial task that receives a user's request and triggers collector tasks. | ||
""" | ||
# with transaction.atomic(): | ||
search_obj = Search.create(query=query) | ||
search_id = search_obj.id | ||
|
||
collectors = [ | ||
collector_1.s(search_id), | ||
collector_2.s(search_id), | ||
collector_3.s(search_id), | ||
] | ||
callback = clean_up.s(search_id=search_id).set( | ||
link_error=clean_up.s(search_id=search_id) | ||
) | ||
group(collectors) | callback.delay() | ||
|
||
|
||
@shared_task(bind=True, max_retries=0) | ||
def collector_1(self, search_id: int): | ||
"""Collect data for collector 1.""" | ||
return execute_collector_tasks(search_id, "collector_1") | ||
|
||
|
||
@shared_task(bind=True, max_retries=0) | ||
def collector_2(self, search_id: int): | ||
"""Collect data for collector 2.""" | ||
return execute_collector_tasks(search_id, "collector_2") | ||
|
||
|
||
@shared_task(bind=True, max_retries=0) | ||
def collector_3(self, search_id: int): | ||
"""Collect data for collector 3.""" | ||
return execute_collector_tasks(search_id, "collector_3") | ||
|
||
|
||
def execute_collector_tasks(search_id: int, collector_name: str): | ||
""" | ||
Execute collector tasks. | ||
Helper function to execute get_list and get_article tasks for a collector. | ||
""" | ||
# Assuming `get_list` generates a list of article IDs for simplicity | ||
article_ids = get_list(search_id, collector_name) | ||
for article_id in article_ids: | ||
get_article.delay(search_id, article_id, collector_name) | ||
return {"status": "Completed", "collector": collector_name} | ||
|
||
|
||
@shared_task | ||
def get_list(search_id: int, collector_name: str): | ||
"""Simulated task to get a list of articles.""" | ||
# Simulate getting a list of article IDs | ||
return [1, 2, 3] # Example article IDs | ||
|
||
|
||
@shared_task | ||
def get_article(search_id: int, article_id: int, collector_name: str): | ||
"""Task to fetch and save article metadata.""" | ||
# Simulate fetching article metadata | ||
metadata = f"Metadata for article {article_id} from {collector_name}" | ||
# with transaction.atomic(): | ||
Article.objects.create(search_id=search_id, meta=metadata) | ||
|
||
|
||
@shared_task | ||
def clean_up(search_id: int): | ||
""" | ||
Clean up temporary storage. | ||
Cleanup task to be triggered when all articles from all collectors | ||
for a specific search are done. | ||
""" | ||
# Implement cleanup logic here, e.g., removing duplicate articles | ||
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
import pytest | ||
|
||
|
||
@pytest.fixture(scope="module") | ||
def celery_config(): | ||
""" | ||
Provide Celery app configuration for testing. | ||
This fixture is responsible for setting up the Celery app with a specific | ||
configuration suitable for test runs. It defines the broker and result backend | ||
to use Rabbitmq and sets the task execution mode to always eager, which means | ||
tasks will be executed locally and synchronously. | ||
Yields | ||
------ | ||
dict | ||
A dictionary containing configuration settings for the Celery application. | ||
""" | ||
return { | ||
"broker_url": "amqp://guest:guest@rabbitmq3:5672", | ||
"result_backend": "redis://localhost:6379/0", | ||
"task_always_eager": True, | ||
} | ||
|
||
|
||
@pytest.fixture(scope="module") | ||
def celery_enable_logging(): | ||
""" | ||
Activate logging for Celery tasks during testing. | ||
This fixture ensures that Celery task logs are visible during test execution, | ||
aiding in debugging and verifying task behavior. | ||
Returns | ||
------- | ||
bool | ||
True to enable Celery task logging, False otherwise. | ||
""" | ||
return True |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.