Skip to content

Commit

Permalink
Fix formatting leftovers (apache#27750)
Browse files Browse the repository at this point in the history
PR 27540 left some formattng issues which weren't caught
  • Loading branch information
bolkedebruin authored Nov 17, 2022
1 parent 2bba98f commit de99cd9
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 29 deletions.
2 changes: 1 addition & 1 deletion airflow/models/xcom.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ def serialize_value(
" then you need to enable pickle support for XCom"
" in your airflow config or make sure to decorate your"
" object with attr.",
ex
ex,
)
raise

Expand Down
59 changes: 31 additions & 28 deletions docs/apache-airflow/concepts/taskflow.rst
Original file line number Diff line number Diff line change
Expand Up @@ -104,39 +104,42 @@ a ``Dataset``, which is ``@attr.define`` decorated, together with TaskFlow.
from airflow import Dataset
from airflow.decorators import dag, task
SRC = Dataset("https://www.ncei.noaa.gov/access/monitoring/climate-at-a-glance/global/time-series/globe/land_ocean/ytd/12/1880-2022.json")
SRC = Dataset(
"https://www.ncei.noaa.gov/access/monitoring/climate-at-a-glance/global/time-series/globe/land_ocean/ytd/12/1880-2022.json"
)
now = pendulum.now()
@dag(start_date=now, schedule="@daily", catchup=False)
def etl():
@task()
def retrieve(src: Dataset) -> dict:
resp = requests.get(url=src.uri)
data = resp.json()
return data["data"]
@task()
def to_fahrenheit(temps: dict[int, float]) -> dict[int, float]:
ret: dict[int, float] = {}
for year, celsius in temps.items():
ret[year] = float(celsius)*1.8+32
return ret
@task()
def load(fahrenheit: dict[int, float]) -> Dataset:
filename = "/tmp/fahrenheit.json"
s = json.dumps(fahrenheit)
f = open(filename, "w")
f.write(s)
f.close()
return Dataset(f"file:///{filename}")
data = retrieve(SRC)
fahrenheit = to_fahrenheit(data)
load(fahrenheit)
@task()
def retrieve(src: Dataset) -> dict:
resp = requests.get(url=src.uri)
data = resp.json()
return data["data"]
@task()
def to_fahrenheit(temps: dict[int, float]) -> dict[int, float]:
ret: dict[int, float] = {}
for year, celsius in temps.items():
ret[year] = float(celsius) * 1.8 + 32
return ret
@task()
def load(fahrenheit: dict[int, float]) -> Dataset:
filename = "/tmp/fahrenheit.json"
s = json.dumps(fahrenheit)
f = open(filename, "w")
f.write(s)
f.close()
return Dataset(f"file:///{filename}")
data = retrieve(SRC)
fahrenheit = to_fahrenheit(data)
load(fahrenheit)
etl()
Expand Down

0 comments on commit de99cd9

Please sign in to comment.