-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathvalidate-stac-geoparquet
executable file
·72 lines (65 loc) · 2.46 KB
/
validate-stac-geoparquet
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#!/usr/bin/env python
import json
import sys
import shutil
import subprocess
import tempfile
from typing import Any
from deepdiff import DeepDiff
from pathlib import Path
import pyarrow.parquet
import stac_geoparquet.arrow
import pyarrow
root = Path(__file__).parents[1]
path = root / "spec-examples" / "v1.1.0" / "extended-item.json"
directory = tempfile.mkdtemp()
parquet_path = Path(directory) / "extended-item.parquet"
def clean_item(item: dict[str, Any]) -> None:
if "type" not in item:
item["type"] = "Feature"
if item["geometry"]["type"] == "MultiPolygon" and len(item["geometry"]["coordinates"]) == 1:
item["geometry"]["type"] = "Polygon"
item["geometry"]["coordinates"] = item["geometry"]["coordinates"][0]
def clean_report(report: dict[str, Any]) -> dict[str, Any]:
"""We expect datetime values to be changed in the report."""
if report.get("values_changed"):
if report["values_changed"].get("root['properties']['datetime']") == {
"new_value": "2020-12-14T18:02:31.437Z",
"old_value": "2020-12-14T18:02:31.437000Z",
}:
del report["values_changed"]["root['properties']['datetime']"]
if not report["values_changed"]:
del report["values_changed"]
return report
try:
# Writing
subprocess.check_call(
["cargo", "run", "--no-default-features", "-F", "geoparquet", "--", "translate", path, parquet_path]
)
table = pyarrow.parquet.read_table(parquet_path)
after = next(stac_geoparquet.arrow.stac_table_to_items(table))
clean_item(after)
with open(path) as f:
before = json.load(f)
report = DeepDiff(before, after).to_dict()
report = clean_report(report)
if report:
print(json.dumps(report, indent=2))
sys.exit(1)
else:
parquet_path.unlink()
# Reading
table = stac_geoparquet.arrow.parse_stac_items_to_arrow([before])
stac_geoparquet.arrow.to_parquet(table, parquet_path)
item_collection = json.loads(subprocess.check_output(
["cargo", "run", "--no-default-features", "-F", "geoparquet", "--", "translate", parquet_path]
))
assert len(item_collection["features"]) == 1
clean_item(item_collection["features"][0]) # stac-geoparquet writes as a multi-polygon
report = DeepDiff(before, item_collection["features"][0]).to_dict()
report = clean_report(report)
if report:
print(json.dumps(report, indent=2))
sys.exit(1)
finally:
shutil.rmtree(directory)