Skip to content

Commit

Permalink
Fix: Improve inference of upstream dependencies for python models (To…
Browse files Browse the repository at this point in the history
  • Loading branch information
izeigerman authored Apr 2, 2024
1 parent 440eb55 commit 14ba624
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 35 deletions.
6 changes: 5 additions & 1 deletion examples/sushi/models/order_items.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
ITEMS = "sushi.items"


def get_items_table(context: ExecutionContext) -> str:
return context.table(ITEMS)


@model(
"sushi.order_items",
kind=IncrementalByTimeRangeKind(
Expand Down Expand Up @@ -45,7 +49,7 @@ def execute(
**kwargs: t.Any,
) -> t.Generator[pd.DataFrame, None, None]:
orders_table = context.table("sushi.orders")
items_table = context.table(ITEMS)
items_table = get_items_table(context)

rng = random.Random()

Expand Down
57 changes: 28 additions & 29 deletions sqlmesh/core/model/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -1671,7 +1671,7 @@ def create_python_model(
# Find dependencies for python models by parsing code if they are not explicitly defined
# Also remove self-references that are found
depends_on = (
_parse_depends_on(entrypoint, python_env) - {name}
_parse_depends_on(python_env) - {name}
if depends_on is None and python_env is not None
else depends_on
)
Expand Down Expand Up @@ -1848,42 +1848,41 @@ def _python_env(
return serialized_env


def _parse_depends_on(
model_func: str,
python_env: t.Dict[str, Executable],
) -> t.Set[str]:
def _parse_depends_on(python_env: t.Dict[str, Executable]) -> t.Set[str]:
"""Parses the source of a model function and finds upstream dependencies based on calls to context."""
env = prepare_env(python_env)
depends_on = set()
executable = python_env[model_func]

for node in ast.walk(ast.parse(executable.payload)):
if not isinstance(node, ast.Call):
for executable in python_env.values():
if not executable.is_definition:
continue
for node in ast.walk(ast.parse(executable.payload)):
if not isinstance(node, ast.Call):
continue

func = node.func
func = node.func

if (
isinstance(func, ast.Attribute)
and isinstance(func.value, ast.Name)
and func.value.id == "context"
and func.attr == "table"
):
if node.args:
table: t.Optional[ast.expr] = node.args[0]
else:
table = next(
(keyword.value for keyword in node.keywords if keyword.arg == "model_name"),
None,
)
if (
isinstance(func, ast.Attribute)
and isinstance(func.value, ast.Name)
and func.value.id == "context"
and func.attr == "table"
):
if node.args:
table: t.Optional[ast.expr] = node.args[0]
else:
table = next(
(keyword.value for keyword in node.keywords if keyword.arg == "model_name"),
None,
)

try:
expression = to_source(table)
depends_on.add(eval(expression, env))
except Exception:
raise ConfigError(
f"Error resolving dependencies for '{executable.path}'. References to context must be resolvable at parse time.\n\n{expression}"
)
try:
expression = to_source(table)
depends_on.add(eval(expression, env))
except Exception:
raise ConfigError(
f"Error resolving dependencies for '{executable.path}'. References to context must be resolvable at parse time.\n\n{expression}"
)

return depends_on

Expand Down
5 changes: 0 additions & 5 deletions sqlmesh/utils/metaprogramming.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,6 @@ class ExecutableKind(str, Enum):
IMPORT = "import"
VALUE = "value"
DEFINITION = "definition"
STATEMENT = "statement"

def __lt__(self, other: t.Any) -> bool:
if not isinstance(other, ExecutableKind):
Expand All @@ -321,10 +320,6 @@ def is_definition(self) -> bool:
def is_import(self) -> bool:
return self.kind == ExecutableKind.IMPORT

@property
def is_statement(self) -> bool:
return self.kind == ExecutableKind.STATEMENT

@property
def is_value(self) -> bool:
return self.kind == ExecutableKind.VALUE
Expand Down

0 comments on commit 14ba624

Please sign in to comment.