forked from iterative/dvc
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
render: vega: data processing outside of renderers
Fixes: iterative#6943
- Loading branch information
Showing
11 changed files
with
494 additions
and
271 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,194 @@ | ||
from copy import deepcopy | ||
from functools import partial | ||
from typing import Dict, List, Optional, Set, Union | ||
|
||
from funcy import first, project | ||
|
||
from dvc.exceptions import DvcException | ||
from dvc.render.base import INDEX_FIELD, REVISION_FIELD | ||
|
||
|
||
class FieldsNotFoundError(DvcException): | ||
def __init__(self, expected_fields, found_fields): | ||
expected_str = ", ".join(expected_fields) | ||
found_str = ", ".join(found_fields) | ||
super().__init__( | ||
f"Could not find all provided fields ('{expected_str}') " | ||
f"in data fields ('{found_str}')." | ||
) | ||
|
||
|
||
class PlotDataStructureError(DvcException): | ||
def __init__(self): | ||
super().__init__( | ||
"Plot data extraction failed. Please see " | ||
"https://man.dvc.org/plots for supported data formats." | ||
) | ||
|
||
|
||
def _filter_fields(datapoints: List[Dict], fields: Set) -> List[Dict]: | ||
if not fields: | ||
return datapoints | ||
assert isinstance(fields, set) | ||
|
||
new_data = [] | ||
for data_point in datapoints: | ||
keys = set(data_point.keys()) | ||
if not fields <= keys: | ||
raise FieldsNotFoundError(fields, keys) | ||
|
||
new_data.append(project(data_point, fields)) | ||
|
||
return new_data | ||
|
||
|
||
def _lists(dictionary: Dict): | ||
for _, value in dictionary.items(): | ||
if isinstance(value, dict): | ||
yield from _lists(value) | ||
elif isinstance(value, list): | ||
yield value | ||
|
||
|
||
def _find_first_list(data: Union[Dict, List], fields: Set) -> List[Dict]: | ||
fields = fields or set() | ||
|
||
if not isinstance(data, dict): | ||
return data | ||
|
||
for lst in _lists(data): | ||
if ( | ||
all(isinstance(dp, dict) for dp in lst) | ||
# if fields is empty, it will match any set | ||
and set(first(lst).keys()) & fields == fields | ||
): | ||
return lst | ||
|
||
raise PlotDataStructureError() | ||
|
||
|
||
def _append_index(datapoints: List[Dict]) -> List[Dict]: | ||
if INDEX_FIELD in first(datapoints).keys(): | ||
return datapoints | ||
|
||
for index, data_point in enumerate(datapoints): | ||
data_point[INDEX_FIELD] = index | ||
return datapoints | ||
|
||
|
||
class Converter: | ||
""" | ||
Class that takes care of converting unspecified data blob | ||
(Dict or List[Dict]) into datapoints (List[Dict]). | ||
If some properties that are required by Template class are missing | ||
('x', 'y') it will attempt to fill in the blanks. | ||
""" | ||
|
||
@staticmethod | ||
def update(datapoints: List[Dict], update_dict: Dict): | ||
for data_point in datapoints: | ||
data_point.update(update_dict) | ||
return datapoints | ||
|
||
def __init__(self, plot_properties: Optional[Dict] = None): | ||
plot_properties = plot_properties or {} | ||
self.props = deepcopy(plot_properties) | ||
self.inferred_props: Dict = {} | ||
|
||
self.steps = [] | ||
|
||
self._infer_x() | ||
self._infer_fields() | ||
|
||
self.steps.append( | ||
( | ||
"find_data", | ||
partial( | ||
_find_first_list, | ||
fields=self.inferred_props.get("fields", set()) | ||
- {INDEX_FIELD}, | ||
), | ||
) | ||
) | ||
|
||
if not self.props.get("x", None): | ||
self.steps.append(("append_index", partial(_append_index))) | ||
|
||
self.steps.append( | ||
( | ||
"filter_fields", | ||
partial( | ||
_filter_fields, | ||
fields=self.inferred_props.get("fields", set()), | ||
), | ||
) | ||
) | ||
|
||
def _infer_x(self): | ||
if not self.props.get("x", None): | ||
self.inferred_props["x"] = INDEX_FIELD | ||
|
||
def skip_step(self, name: str): | ||
self.steps = [(_name, fn) for _name, fn in self.steps if _name != name] | ||
|
||
def _infer_fields(self): | ||
fields = self.props.get("fields", set()) | ||
if fields: | ||
fields = { | ||
*fields, | ||
self.props.get("x", None), | ||
self.props.get("y", None), | ||
self.inferred_props.get("x", None), | ||
} - {None} | ||
self.inferred_props["fields"] = fields | ||
|
||
def _infer_y(self, datapoints: List[Dict]): | ||
if "y" not in self.props: | ||
data_fields = list(first(datapoints)) | ||
skip = ( | ||
REVISION_FIELD, | ||
self.props.get("x", None) or self.inferred_props.get("x"), | ||
) | ||
inferred_y = first( | ||
f for f in reversed(data_fields) if f not in skip | ||
) | ||
if "y" in self.inferred_props: | ||
previous_y = self.inferred_props["y"] | ||
if previous_y != inferred_y: | ||
raise DvcException( | ||
f"Inferred y ('{inferred_y}' value does not match" | ||
f"previously matched one ('f{previous_y}')." | ||
) | ||
else: | ||
self.inferred_props["y"] = inferred_y | ||
|
||
def convert(self, data): | ||
""" | ||
Convert the data. Fill necessary fields ('x', 'y') and return both | ||
generated datapoints and updated properties. | ||
""" | ||
processed = deepcopy(data) | ||
|
||
for _, step in self.steps: | ||
processed = step(processed) | ||
|
||
self._infer_y(processed) | ||
|
||
return processed, {**self.props, **self.inferred_props} | ||
|
||
|
||
def to_datapoints(data: Dict, props: Dict): | ||
converter = Converter(props) | ||
|
||
datapoints = [] | ||
for revision, rev_data in data.items(): | ||
for _, file_data in rev_data.get("data", {}).items(): | ||
if "data" in file_data: | ||
processed, final_props = converter.convert( | ||
file_data.get("data") | ||
) | ||
|
||
Converter.update(processed, {REVISION_FIELD: revision}) | ||
|
||
datapoints.extend(processed) | ||
return datapoints, final_props |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.