forked from sinaptik-ai/pandas-ai
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbase_logic_unit.py
32 lines (25 loc) · 1.04 KB
/
base_logic_unit.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from abc import ABC, abstractmethod
from typing import Any
from pandasai.pipelines.logic_unit_output import LogicUnitOutput
class BaseLogicUnit(ABC):
"""
Logic units for pipeline each logic unit should be inherited from this Logic unit
"""
def __init__(self, skip_if=None, on_execution=None, before_execution=None):
super().__init__()
self.skip_if = skip_if
self.on_execution = on_execution
self.before_execution = before_execution
@abstractmethod
def execute(self, input: Any, **kwargs) -> LogicUnitOutput:
"""
This method will return output according to
Implementation.
:param input: Your input data.
:param kwargs: A dictionary of keyword arguments.
- 'logger' (any): The logger for logging.
- 'config' (Config): Global configurations for the test
- 'context' (any): The execution context.
:return: The result of the execution.
"""
raise NotImplementedError("execute method is not implemented.")