-
Notifications
You must be signed in to change notification settings - Fork 2
/
ERA5_retrieve_postproc.py
executable file
·206 lines (165 loc) · 8.59 KB
/
ERA5_retrieve_postproc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
#!/usr/bin/env python3
# basic python3 scripts to retrieve ERA5 data from the CDS, to replace old Bash scripts
# Parallel retrieval is done as a function of the years (set nprocs)
# A single variable at time can be retrieved
# Data are downloaded in grib and then archived in netcdf4 zip using CDO bindings
# Monthly means as well as hourly data can be downloaded
# Multiple grids are supported
# Both surface variables and pressure levels are supported.
# Support for area selection has been included
# @ Author: Paolo Davini, CNR-ISAC, Jun 2022
import os
import sys
from pathlib import Path
from cdo import Cdo
import shutil
from multiprocessing import Process
import glob
from CDS_retriever import year_retrieve, year_convert, create_filename, first_last_year, which_new_years_download
from config import parser, load_config, print_config
cdo = Cdo()
def main():
# Call argument parser
args = parser()
if args.config:
# Load YAML file
config = load_config(args.config)
# Print a description of the loaded configuration
print_config(config)
# Translate the configuration on local variables
tmpdir = config['tmpdir']
storedir = config['storedir']
dataset = config['dataset']
varlist = config['varlist']
year1 = config['year']['begin']
year2 = config['year']['end']
update = config['year']['update']
freq = config['freq']
levelout = config['levelout']
grid = config['grid']
area = config['area']
nprocs = config['nprocs']
download_request = config['download_request']
do_retrieve = config['do_retrieve']
do_postproc = config['do_postproc']
do_align = config['do_align']
# Override config with command line args
if args.nprocs:
print(f"Overriding YAML nprocs ({config['nprocs']}) with command-line arg ({args.nprocs})")
nprocs = args.nprocs
if args.update:
print(f"Overriding YAML update ({config['year']['update']}) with command-line arg ({args.update})")
update = args.update
# safecheck
if isinstance(varlist, str):
varlist = [varlist]
for var in varlist:
if update:
print("Update flag is true, detection of years...")
year1, year2 = which_new_years_download(storedir, dataset, var, freq, grid, levelout, area)
print(year1, year2)
if year1 > year2:
print('Everything you want has been already downloaded, disabling retrieve...')
do_retrieve = False
if (freq == 'mon'):
print('Everything you want has been already postprocessed, disabling postproc...')
do_postproc = False
# create list of years
years = [str(i) for i in range(year1, year2+1)]
# define the out dir and file
savedir = Path(tmpdir, var)
print(f'Creating directory {savedir} if it does not exist')
Path(savedir).mkdir(parents=True, exist_ok=True)
# retrieve block
if do_retrieve:
# loop on the years create the parallel process
processes = []
yearlist = [years[i:i + nprocs] for i in range(0, len(years), nprocs)]
for lyears in yearlist:
print(f"Working on years {lyears}\n")
for year in lyears:
# print(year)
p = Process(target=year_retrieve, args=(dataset, var, freq, year, grid, levelout,
area, savedir, download_request))
p.start()
processes.append(p)
# wait for all the processes to end
for process in processes:
process.join()
#
if do_postproc:
cdo.debug = True
print('Running postproc...')
destdir = Path(storedir, freq)
Path(destdir).mkdir(parents=True, exist_ok=True)
# loop on the years create the parallel process for a fast conversion
processes = []
yearlist = [years[i:i + nprocs] for i in range(0, len(years), nprocs)]
for lyears in yearlist:
for year in lyears:
print('Conversion of ' + year)
filename = create_filename(dataset, var, freq, grid, levelout, area, year)
infile = Path(savedir, filename + '.grib')
outfile = Path(destdir, filename + '.nc')
p = Process(target=year_convert, args=(infile, outfile))
# p = Process(target=cdo.copy, args=(infile, outfile, '-f nc4 -z zip'))
p.start()
processes.append(p)
# wait for all the processes to end
for process in processes:
process.join()
print('Conversion complete!')
# extra processing for monthly data
if freq == "mon":
print('Extra processing for monthly...')
filepattern = str(Path(destdir, create_filename(dataset, var, freq, grid, levelout, area, '????') + '.nc'))
first_year, last_year = first_last_year(filepattern)
if update:
# check if big file exists
bigfile = str(Path(destdir, create_filename(dataset, var, freq,
grid, levelout, area, '????', '????') + '.nc'))
filebase = glob.glob(bigfile)
first_year, _ = first_last_year(bigfile)
filepattern = filebase + glob.glob(filepattern)
mergefile = str(Path(destdir, create_filename(dataset, var, freq, grid,
levelout, area, first_year + '-' + last_year) + '.nc'))
print(mergefile)
if os.path.exists(mergefile):
print(f'Removing existing file {mergefile}...')
os.remove(mergefile)
print(f'Merging together into {mergefile}...')
cdo.cat(input=filepattern, output=mergefile, options='-f nc4 -z zip')
if isinstance(filepattern, str):
loop = glob.glob(filepattern)
for f in loop:
os.remove(f)
# HACK: set a common time axis for monthly data (roll back cumulated by 6hours). useful for catalog xarray loading
if do_align:
print('Aligningment required...')
first_time = cdo.showtime(input=f'-seltimestep,1 {mergefile}')[0]
if first_time != '00:00:00':
tempfile = str(Path(tmpdir, 'temp_align.nc'))
shutil.move(mergefile, tempfile)
cdo.shifttime('-6hours', input=tempfile, output=mergefile, options='-f nc4 -z zip')
os.remove(tempfile)
# extra processing for daily data
else:
print('Extra processing for daily and 6hrs...')
daydir, mondir = [Path(storedir, var, x) for x in ['day', 'mon']]
Path(daydir).mkdir(parents=True, exist_ok=True)
Path(mondir).mkdir(parents=True, exist_ok=True)
filepattern = Path(destdir, create_filename(dataset, var, freq, grid, levelout, area, '????') + '.nc')
first_year, last_year = first_last_year(filepattern)
dayfile = str(Path(daydir, create_filename(dataset, var, 'day', grid,
levelout, area, first_year + '-' + last_year) + '.nc'))
# monfile = str(Path(mondir, create_filename(var, 'mon', grid, levelout, area, first_year + '-' + last_year) + '.nc'))
if os.path.exists(dayfile):
os.remove(dayfile)
cdo.daymean(input='-cat ' + str(filepattern),
output=dayfile, options='-f nc4 -z zip')
# cdo.monmean(input = dayfile, output = monfile, options = '-f nc4 -z zip')
else:
sys.exit('Error in loading the configuration!')
return
if __name__ == "__main__":
main()