diff --git a/scripts/extract_wegener_net_collocations.sh b/scripts/extract_wegener_net_collocations.sh new file mode 100755 index 0000000..74fb7d4 --- /dev/null +++ b/scripts/extract_wegener_net_collocations.sh @@ -0,0 +1,4 @@ +#1/bin/bash + +echo "Extracting data for year $1 and month $2." +speed extract_data gmi wegener_net /xdata/simon/raw/gmi/wegener_net/ $1 $2 diff --git a/speed/cli.py b/speed/cli.py index 98a5071..878704e 100644 --- a/speed/cli.py +++ b/speed/cli.py @@ -19,7 +19,7 @@ import click import speed.logging -from speed.data import cpcir, goes +from speed.data import cpcir, goes, ancillary LOGGER = logging.getLogger(__file__) @@ -62,6 +62,7 @@ def extract_data( import speed.data.gpm_gv import speed.data.combined import speed.data.noaa + import speed.data.wegener_net input_dataset = get_input_dataset(input_data) if input_dataset is None: @@ -395,13 +396,13 @@ def extract_evaluation_data( times_on_swath = {} for f_on_swath in files_on_swath: - time_str = f_on_swath.name.split("_")[2][:-3] + time_str = f_on_swath.name.split("_")[-1][:-3] median_time = datetime.strptime(time_str, "%Y%m%d%H%M%S") times_on_swath[median_time] = f_on_swath times_gridded = {} for f_gridded in files_gridded: - time_str = f_gridded.name.split("_")[2][:-3] + time_str = f_gridded.name.split("_")[-1][:-3] median_time = datetime.strptime(time_str, "%Y%m%d%H%M%S") times_gridded[median_time] = f_gridded @@ -433,3 +434,4 @@ def extract_evaluation_data( cli.add_command(extract_evaluation_data, name="extract_evaluation_data") cli.add_command(cpcir.cli, name="extract_cpcir_obs") cli.add_command(goes.cli, name="extract_goes_obs") +cli.add_command(ancillary.cli, name="add_ancillary_data") diff --git a/speed/data/ancillary.py b/speed/data/ancillary.py new file mode 100644 index 0000000..257153b --- /dev/null +++ b/speed/data/ancillary.py @@ -0,0 +1,815 @@ +""" +speed.data.ancillary +==================== + +Functionality to extract ancillary data for SPEED collocations. +""" +from calendar import monthrange +from datetime import datetime, timedelta +import logging +from pathlib import Path +import struct +from typing import List, Optional, Tuple + +import click +import numpy as np +from pansat.time import TimeRange, to_datetime, to_datetime64 +from pansat.geometry import LonLatRect +from pansat.products.dem.globe import globe +from rich.progress import track, Progress +from scipy.ndimage import binary_dilation +import unlzw3 +import xarray as xr + + +import speed + + +LOGGER = logging.getLogger(__name__) + + +FOOTPRINTS = { + "gmi": 32, + "atms": 74 +} + + +def find_era5_sfc_files( + era5_path: Path, + start_time: np.datetime64, + end_time: np.datetime64, +) -> List[Path]: + """ + Find ERA5 surface-data files to load within a given time range. + + Args: + era5_path: The path containing the ERA5 files. + start_time: The start time of the time range for which to load + ERA5 data. + end_time: The end time of the time range for which to load + ERA5 data. + + Return: + A list of path object pointing to the ERA5 files containing the + data covering the given time range. + """ + era5_path = Path(era5_path) + start_time = to_datetime(start_time) + end_time = to_datetime(end_time) + + era5_files = [] + time = datetime(start_time.year, start_time.month, day=1) + while time <= end_time: + mpath = era5_path / f"{time.year}{time.month:02}" + era5_files += sorted(list(mpath.glob("ERA5*surf.nc"))) + _, days = monthrange(time.year, time.month) + time += timedelta(days=days) + + filtered = [] + for era5_file in era5_files: + file_start = datetime.strptime(era5_file.name.split("_")[1], "%Y%m%d") + file_end = file_start + timedelta(days=1) + if not (end_time < file_start or start_time > file_end): + filtered.append(era5_file) + + return filtered + + +def find_era5_lai_files( + era5_lai_path: Path, + start_time: np.datetime64, + end_time: np.datetime64, +) -> List[Path]: + """ + Find ERA5 leaf-area index (LAI) files to load within a given time range. + + Args: + era5_lai_path: The path containing the ERA5 files. + start_time: The start time of the time range for which to load + ERA5 data. + end_time: The end time of the time range for which to load + ERA5 data. + + Return: + A list of path object pointing to the ERA5 files containing the + data covering the given time range. + """ + era5_lai_path = Path(era5_lai_path) + start_time = to_datetime(start_time) + end_time = to_datetime(end_time) + + lai_paths = [] + time = datetime(start_time.year, start_time.month, day=1) + while time <= end_time: + mpath = era5_lai_path / f"{time.year}{time.month:02}" + lai_paths += sorted(list(mpath.glob("ERA5*lai.nc"))) + _, days = monthrange(time.year, time.month) + time += timedelta(days=days) + + filtered = [] + for lai_path in lai_paths: + file_start = datetime.strptime(lai_path.name.split("_")[1], "%Y%m%d") + file_end = file_start + timedelta(days=1) + if not (end_time < file_start or start_time > file_end): + filtered.append(lai_path) + + return filtered + + +def load_era5_ancillary_data( + era5_sfc_files: List[Path], + era5_lai_files: List[Path], + roi: Optional[Tuple[float, float, float, float]] = None +) -> xr.Dataset: + """ + Load ancillary data from ERA5 files. + + Args: + era5_sfc_files: A list containing the ERA5 surface-data files from which to load the data. + era5_lai_files: A list containing the ERA5 leaf-area-index files from which to load the data. + + Return: + An xarray.Dataset containing the ERA5 surface ancillary combined with + the leaf-area index. + """ + new_names = { + "tcwv": "total_column_water_vapor", + "t2m": "two_meter_temperature", + "u10": "ten_meter_wind_u", + "v10": "ten_meter_wind_v", + "sst": "sea_surface_temperature", + "skt": "skin_temperature", + "siconc": "sea_ice_concentration", + "sd": "snow_depth", + "cape": "cape" + } + + data = [] + for era5_file in era5_sfc_files: + with xr.open_dataset(era5_file) as inpt: + if roi is not None: + lon_min, lat_min, lon_max, lat_max = roi + lon_min = (lon_min + 360) % 360 + lon_max = (lon_max + 360) % 360 + lons = inpt.longitude.data + lats = inpt.latitude.data + lon_min -= 0.5 + lat_min -= 0.5 + lon_max += 0.5 + lat_max += 0.5 + lon_mask = (lon_min <= lons) * (lons <= lon_max) + lat_mask = (lat_min <= lats) * (lats <= lat_max) + inpt = inpt[{"longitude": lon_mask, "latitude": lat_mask}] + data.append(inpt[list(new_names.keys())].rename(**new_names)) + + data = xr.concat(data, "time") + + lai_data = [] + for era5_file in era5_lai_files: + with xr.open_dataset(era5_file) as inpt: + if roi is not None: + lon_min, lat_min, lon_max, lat_max = roi + lon_min = (lon_min + 360) % 360 + lon_max = (lon_max + 360) % 360 + lons = inpt.longitude.data + lats = inpt.latitude.data + lon_min -= 0.5 + lat_min -= 0.5 + lon_max += 0.5 + lat_max += 0.5 + lon_mask = (lon_min <= lons) * (lons <= lon_max) + lat_mask = (lat_min <= lats) * (lats <= lat_max) + inpt = inpt[{"longitude": lon_mask, "latitude": lat_mask}] + lai_data.append(inpt[["lai_hv", "lai_lv"]]) + + lai_data = xr.concat(lai_data, dim="time") + data["leaf_area_index"] = lai_data["lai_hv"] + lai_data["lai_lv"] + + lons = data.longitude.data + lons[lons > 180] -= 360 + + return data.sortby("longitude") + + +def find_autosnow_file( + autosnow_path: Path, + date: np.datetime64 +) -> Path: + """ + Find autosnow files for a given day. + + Args: + autosnow_path: The root directory of the folder tree containing the autosnow files. + date: A date object specifying the day for which to load the autosnow data. + + Return: + A path pointing to the autosnow file for the given day. + """ + autosnow_path = Path(autosnow_path) + date = to_datetime(date) + path = autosnow_path / date.strftime("autosnowV3/%Y/gmasi_snowice_reproc_v003_%Y%j.Z") + return path + + +def load_autosnow_data(path: Path) -> xr.DataArray: + """ + Load autosnow data into a data array. + + Args: + path: A path object pointing to an AutoSnow file. + + Return: + An xr.DataArray containing the data loaded from the file. + """ + n_lons = 9000 + n_lats = 4500 + with open(path, 'rb') as inpt: + cmpd = inpt.read() + dcmpd = unlzw3.unlzw(cmpd) + data = np.frombuffer(dcmpd, dtype="i1") + data = data.reshape((n_lats, n_lons)) + lons = np.linspace(-180, 180, n_lons + 1) + lons = 0.5 * (lons[1:] + lons[:-1]) + lats = np.linspace(90, -90, n_lats + 1) + lats = 0.5 * (lats[1:] + lats[:-1]) + return xr.DataArray( + data=data, + dims=("latitude", "longitude"), + coords={ + "latitude": lats, + "longitude": lons + } + ) + + +def load_landmask_data( + ancillary_dir: Path, + footprint: str = "32", + resolution: int = 32 +) -> xr.DataArray: + """ + Load GPM landmask file into a data array. + + Args: + path: A path object pointing to the preprocessor ancillary directory. + + Return: + An xr.DataArray containing the data loaded from the file. + """ + n_lons = 360 * resolution + n_lats = 180 * resolution + landmask_file = ancillary_dir / f"landmask{footprint}_{resolution}.bin" + data = np.fromfile(landmask_file, dtype="i1").reshape((n_lats, n_lons)) + lons = np.linspace(-180, 180, n_lons + 1) + lons = 0.5 * (lons[1:] + lons[:-1]) + lats = np.linspace(90, -90, n_lats + 1) + lats = 0.5 * (lats[1:] + lats[:-1]) + return xr.DataArray( + data=data, + dims=("latitude", "longitude"), + coords={ + "latitude": lats, + "longitude": lons + } + ) + + +def load_emissivity_data(ancillary_dir: Path, date: np.datetime64) -> xr.DataArray: + """ + Load GPM emissivity data into file. + + Args: + path: A path object pointing to the preprocessor ancillary directory. + date: A date object defining the month for which to load to emissivity data. + + Return: + An xr.DataArray containing the data loaded from the file. + """ + month = to_datetime(date).month + emiss_file = ancillary_dir / f"emiss_class_{month:02}.dat" + + emiss = [] + n_lons = 720 + n_lats = 359 + + data = np.fromfile(emiss_file, dtype="i2").reshape((n_lats, n_lons)) + + data_c = data[1:-1, 1:-1] + for ind in range(3): + for x_shift in [-1, 1]: + for y_shift in [-1, 1]: + data_shifted = data[1 + y_shift: n_lats - 1 + y_shift, 1 + x_shift : n_lons - 1 + x_shift] + update_mask = (data_c == 0) * (data_shifted != 0) + data_c[update_mask] = data_shifted[update_mask] + + lons = np.linspace(-180, 180, n_lons + 1) + lons = 0.5 * (lons[1:] + lons[:-1]) + lats = np.linspace(90, -90, n_lats + 1) + lats = 0.5 * (lats[1:] + lats[:-1]) + return xr.DataArray( + data=data, + dims=("latitude", "longitude"), + coords={ + "latitude": lats, + "longitude": lons + } + ) + + +def load_emissivity_data_all_months(ancillary_dir: Path) -> xr.DataArray: + """ + Load GPM emissivity data for all months. + + Args: + path: A path object pointing to the preprocessor ancillary directory. + + Return: + An xr.DataArray containing the data loaded from the file. + """ + emiss = [] + n_lons = 720 + n_lats = 359 + + emiss_data = [] + + for month in range(1, 13): + emiss_file = ancillary_dir / f"emiss_class_{month:02}.dat" + data = np.fromfile(emiss_file, dtype="i2").reshape((n_lats, n_lons)) + + data_c = data[1:-1, 1:-1] + for ind in range(3): + for x_shift in [-1, 1]: + for y_shift in [-1, 1]: + data_shifted = data[1 + y_shift: n_lats - 1 + y_shift, 1 + x_shift : n_lons - 1 + x_shift] + update_mask = (data_c == 0) * (data_shifted != 0) + data_c[update_mask] = data_shifted[update_mask] + + lons = np.linspace(-180, 180, n_lons + 1) + lons = 0.5 * (lons[1:] + lons[:-1]) + lats = np.linspace(90, -90, n_lats + 1) + lats = 0.5 * (lats[1:] + lats[:-1]) + emiss_data.append(xr.DataArray( + data=data, + dims=("latitude", "longitude"), + coords={ + "latitude": lats, + "longitude": lons + } + )) + return xr.concat(emiss_data, dim="month") + + +def load_mountain_mask(ancillary_dir: Path) -> xr.DataArray: + """ + Read GPROF mountain mask. + + Args: + ancillary_dir: A pathlib.Path object pointing to the GPROF preprocessor ancillary directory + + Return: + An xarray.DataArray containing the mountain mask. + """ + fname = Path(ancillary_dir) / "k3classes_0.1deg.asc.bin" + with open(fname, "rb") as data: + n_lons = struct.unpack("i", data.read(4))[0] + n_lons = struct.unpack("i", data.read(4))[0] + n_lats = struct.unpack("i", data.read(4))[0] + data = np.fromfile(data, dtype=np.int32, count=n_lons * n_lats) + + data = data.reshape(n_lats, n_lons) + data = np.roll(data, shift=n_lons // 2, axis=-1) + + lons = np.linspace(-180, 180, n_lons + 1) + lons = 0.5 * (lons[1:] + lons[:-1]) + lats = np.linspace(-90, 90, n_lats + 1) + lats = 0.5 * (lats[1:] + lats[:-1]) + + data = xr.DataArray( + data=data, + dims=("latitude", "longitude"), + coords={ + "latitude": lats, + "longitude": lons + } + ) + return data + + +def load_gprof_surface_type_data( + ancillary_dir: Path, + ingest_dir: Path, + date: np.datetime64, + footprint: str = "32", + resolution: int = 32, + roi: Optional[Tuple[float, float, float, float]] = None +) -> xr.DataArray: + """ + Load the CSU surface type classification employed by GPROF. + + Args: + ancillary_dir: Path pointing to the directory containing the GPROF ancillary data. + ingest_dir: Path pointing to the directory containing the GPROF ingest data. + date: A np.datetime64 object specifying the date for which to load the surface + type classes. + roi: An optional tuple ``(lon_min, lat_min, lon_max, lat_max)`` defining a bounding + + + Return: + An xarray.Dataset containing the GPROF surface classification. + """ + ancillary_dir = Path(ancillary_dir) + ingest_dir = Path(ingest_dir) + + date = to_datetime(date) + month = date.month + + landmask_data = load_landmask_data( + ancillary_dir, + footprint=footprint, + resolution=resolution + ) + lons = landmask_data.longitude.data + lats = landmask_data.latitude.data + if roi is not None: + lon_min, lat_min, lon_max, lat_max = roi + lon_mask = (lon_min <= lons) * (lons <= lon_max) + lat_mask = (lat_min <= lats) * (lats <= lat_max) + landmask_data = landmask_data[{"longitude": lon_mask, "latitude": lat_mask}] + + emissivity_data = load_emissivity_data_all_months(ancillary_dir) + if roi is not None: + lons = emissivity_data.longitude.data + lats = emissivity_data.latitude.data + lon_min, lat_min, lon_max, lat_max = roi + lon_min -= 1.0 + lon_max += 1.0 + lat_min -= 1.0 + lat_max += 1.0 + lon_mask = (lon_min <= lons) * (lons <= lon_max) + lat_mask = (lat_min <= lats) * (lats <= lat_max) + emissivity_data = emissivity_data[{"longitude": lon_mask, "latitude": lat_mask}] + emissivity_data_all = emissivity_data.interp( + longitude=landmask_data.longitude.data, + latitude=landmask_data.latitude.data, + method="nearest" + ) + emissivity_data = emissivity_data_all[{"month": month - 1}] + + autosnow_file = find_autosnow_file(ingest_dir, date) + autosnow_data = load_autosnow_data(autosnow_file).copy() + if roi is not None: + lons = autosnow_data.longitude.data + lats = autosnow_data.latitude.data + lon_min, lat_min, lon_max, lat_max = roi + lon_min -= 1.0 + lon_max += 1.0 + lat_min -= 1.0 + lat_max += 1.0 + lon_mask = (lon_min <= lons) * (lons <= lon_max) + lat_mask = (lat_min <= lats) * (lats <= lat_max) + autosnow_data = autosnow_data[{"longitude": lon_mask, "latitude": lat_mask}] + + # Expand sea ice + sea_ice_mask = autosnow_data.data == 3 + sea_ice_mask = binary_dilation(sea_ice_mask, structure=np.ones((10, 10))) + zero_mask = autosnow_data.data == 0 + autosnow_data.data[zero_mask * sea_ice_mask] = 3 + + autosnow_data = autosnow_data.interp( + latitude=landmask_data.latitude.data, + longitude=landmask_data.longitude.data, + method="nearest" + ) + + mountain_mask = load_mountain_mask(ancillary_dir) + if roi is not None: + lons = mountain_mask.longitude.data + lats = mountain_mask.latitude.data + lon_min, lat_min, lon_max, lat_max = roi + lon_min -= 1.0 + lon_max += 1.0 + lat_min -= 1.0 + lat_max += 1.0 + lon_mask = (lon_min <= lons) * (lons <= lon_max) + lat_mask = (lat_min <= lats) * (lats <= lat_max) + mountain_mask = mountain_mask[{"longitude": lon_mask, "latitude": lat_mask}] + mountain_mask = mountain_mask.interp( + latitude=landmask_data.latitude.data, + longitude=landmask_data.longitude.data + ) + + sfc_type = np.zeros((landmask_data.shape), dtype=np.int8) + + sfc_type[(landmask_data >= 0) * (landmask_data <= 2)] = 10 + sfc_type[(landmask_data > 2) * (landmask_data <= 25)] = 30 + sfc_type[(landmask_data > 25) * (landmask_data <= 75)] = 31 + sfc_type[(landmask_data > 75) * (landmask_data <= 95)] = 32 + sfc_type[(landmask_data > 95)] = 20 + + snow = autosnow_data.data + + sfc_type[sfc_type == 10] = 1 + sfc_type[(sfc_type == 1) * (snow == 2)] = 2 + sfc_type[snow == 3] = 2 + sfc_type[snow == 5] = 16 + + land = sfc_type == 20 + snow_pixels = ((snow == 2) + (snow == 3)) + no_snow_pixels = ~snow_pixels + + mask = (emissivity_data.data >= 6) * (emissivity_data.data <= 9) * snow_pixels * land + sfc_type[mask] = emissivity_data.data[mask] + 2 + + mask = ((emissivity_data.data >= 1) * (emissivity_data.data <= 5) + (emissivity_data.data == 10)) * snow_pixels * land + sfc_type[mask] = 10 + + mask = (emissivity_data.data == 0) * snow_pixels * land + sfc_type[mask] = 8 + + mask = (emissivity_data.data >= 1) * (emissivity_data.data <= 5) * no_snow_pixels * land + sfc_type[mask] = emissivity_data.data[mask] + 2 + + frozen = (emissivity_data.data >= 6) * (emissivity_data.data <= 9) * land * no_snow_pixels + for ind in range(11): + prev_month = (month - ind - 1) % 12 + prev_emiss = emissivity_data_all[{"month": prev_month}] + mask = frozen * (prev_emiss < 6) * (sfc_type > 7) * (sfc_type != 12) + sfc_type[mask] = prev_emiss.data[mask] + 2 + mask = frozen * (prev_emiss == 10) * (sfc_type != 12) * (sfc_type > 8) + sfc_type[mask] = 12 + + mask = (emissivity_data.data == 10) * no_snow_pixels * land + sfc_type[mask] = 12 + + mask = (emissivity_data.data == 0) * land * no_snow_pixels + sfc_type[mask] = 1 + + sfc_type[sfc_type == 20] = 12 + + sfc_type[sfc_type == 30] = 13 + sfc_type[sfc_type == 31] = 14 + sfc_type[sfc_type == 32] = 15 + + mask = (sfc_type >= 13) * (sfc_type <= 15) + sfc_type[mask * (snow == 2)] = 10 + sfc_type[mask * (snow == 3)] = 2 + + mtn_no_snow = (sfc_type >= 3) * (sfc_type <= 7) * (mountain_mask >= 1) + sfc_type[mtn_no_snow] = 17 + mtn_snow = (sfc_type >= 8) * (sfc_type <= 11) * (mountain_mask >= 1) + sfc_type[mtn_snow] = 18 + + return xr.DataArray( + data=sfc_type, + dims=("latitude", "longitude"), + coords={ + "latitude": landmask_data.latitude.data, + "longitude": landmask_data.longitude.data + } + ) + + +def load_elevation_data(roi: Tuple[float, float]) -> xr.Dataset: + """ + Load elevation data from the NOAA GLOBE dataset. + """ + date = np.datetime64("2020-01-01") + roi = LonLatRect(*roi) + recs = globe.get(time_range=TimeRange(date), roi=roi) + tiles = {} + for rec in recs: + ind = ord(rec.filename[0]) - ord('a') + row_ind = ind // 4 + col_ind = ind % 4 + tiles[(row_ind, col_ind)] = globe.open(rec) + + row_inds = np.unique([coord[0] for coord in tiles.keys()]) + col_inds = np.unique([coord[1] for coord in tiles.keys()]) + + combined = [] + for row_ind in row_inds: + nested = [] + for col_ind in col_inds: + nested.append(tiles[row_ind, col_ind]) + nested = xr.concat(nested, dim="longitude") + combined.append(nested) + return xr.concat(combined, dim="latitude") + + +def add_ancillary_data( + path_on_swath: Path, + path_gridded: Path, + era5_path: Path, + era5_lai_path: Path, + gprof_ancillary_dir: Path, + gprof_ingest_dir: Path +) -> None: + """ + Add ancillary data to SPEED collocations. + + The data is added to both the 'on-swath' and 'gridded' files in a separate group + called 'ancillary'. + + Args: + path_on_swath: Path to the file containing the collocations extracted in on_swath format. + path_gridded: Path to the file containing the collocations extract in gridded format. + era5_path: Path pointing to the folder containing the ERA5 files. + era5_lai_path: Path pointing to the folder containing the ERA5 LAI data. + gprof_ancillary_dir: Path pointing to the GPROF ancillary dir. + gprof_ingest_dir: Path pointing to the GPROF ingest dir. + """ + sensor = path_on_swath.name.split("_")[1] + time_str = path_on_swath.name.split("_")[2][:-3] + median_time = to_datetime64(datetime.strptime(time_str, "%Y%m%d%H%M%S")) + + with xr.load_dataset(path_on_swath, group="input_data") as data_on_swath: + lons_os = data_on_swath.longitude + lon_min, lon_max = lons_os.data.min(), lons_os.data.max() + lats_os = data_on_swath.latitude + lat_min, lat_max = lats_os.data.min(), lats_os.data.max() + + roi = (lon_min, lat_min, lon_max, lat_max) + scan_time = data_on_swath.scan_time + data_on_swath.close() + + # Store ancillary data in gridded geometry + with xr.load_dataset(path_gridded, group="reference_data") as data_gridded: + lons_g = data_gridded.longitude.compute() + lats_g = data_gridded.latitude.compute() + time_g = data_gridded.time.compute() + data_gridded.close() + + start_time = scan_time.data.min() + end_time = scan_time.data.max() + print(start_time) + era5_sfc_files = find_era5_sfc_files(era5_path, start_time, end_time) + era5_lai_files = find_era5_lai_files(era5_lai_path, start_time, end_time) + era5_data = load_era5_ancillary_data( + era5_sfc_files, era5_lai_files, roi=roi + ) + surface_type = load_gprof_surface_type_data( + ancillary_dir=gprof_ancillary_dir, + ingest_dir=gprof_ingest_dir, + date=median_time, + footprint=FOOTPRINTS[sensor.lower()], + resolution=32, + roi=roi + ) + elevation = load_elevation_data(roi=roi) + + # Store ancillary data in on-swath geometry + ancillary_data = era5_data.interp( + latitude=lats_os, + longitude=lons_os, + time=scan_time + ).rename(time="scan_time") + surface_type_os = surface_type.interp( + latitude=lats_os, + longitude=lons_os, + method="nearest", + kwargs={"fill_value": -1} + ) + ancillary_data["surface_type"] = surface_type_os + elevation_os = elevation.interp( + latitude=lats_os, + longitude=lons_os, + ) + ancillary_data["elevation"] = elevation_os["elevation"] + encoding = { + var: {"dtype": "float32", "compression": "zstd"} + for var in ancillary_data.variables if var != "surface_type" + } + encoding["surface_type"] = {"dtype": "int8", "compression": "zstd"} + print(ancillary_data) + ancillary_data.to_netcdf( + path_on_swath, + group="ancillary_data", + mode="a", + ) + + ancillary_data = era5_data.interp( + latitude=lats_g, + longitude=lons_g, + time=time_g, + ) + surface_type_g = surface_type.interp( + latitude=lats_g, + longitude=lons_g, + method="nearest", + kwargs={"fill_value": -1} + ) + ancillary_data["surface_type"] = surface_type_g + elevation_g = elevation.interp( + latitude=lats_g, + longitude=lons_g, + ) + ancillary_data["elevation"] = elevation_g["elevation"] + encoding = { + var: {"dtype": "float32", "compression": "zstd"} + for var in ancillary_data.variables if var != "surface_type" + } + print(ancillary_data) + encoding["surface_type"] = {"dtype": "int8", "compression": "zstd"} + ancillary_data.to_netcdf( + path_gridded, + group="ancillary_data", + mode="a", + ) + + +@click.command() +@click.argument("collocation_path", type=str) +@click.option("--n_processes", type=int, default=1) +@click.option("--pattern", type=str, default="*.nc") +@click.option("--era5_path", type=str, default="/qdata2/archive/ERA5") +@click.option("--era5_lai_path", type=str, default="/pdata4/pbrown/ERA5") +@click.option("--gprof_ancillary_dir", type=str, default="/qdata1/pbrown/gpm/ppancillary") +@click.option("--gprof_ingest_dir", type=str, default="/qdata1/pbrown/gpm/ppingest") +def cli( + collocation_path: str, + n_processes: int = 1, + pattern: str = "*.nc", + era5_path: str = "/qdata2/archive/ERA5", + era5_lai_path: str = "/pdata4/pbrown/ERA5", + gprof_ancillary_dir: str = "/qdata1/pbrown/gpm/ppancillary", + gprof_ingest_dir: str = "/qdata1/pbrown/gpm/ppingest" +): + """ + Extract ancillary data for GPM collocations. + + speed extract_ancillary_data collocation_path + + Extracts GPROF ancillary data for GPM collocations scenes. + """ + collocation_path = Path(collocation_path) + if not collocation_path.exists(): + LOGGER.error("Provided collocation path must point to an existing directory.") + return 1 + + files_on_swath = sorted(list((collocation_path / "on_swath").glob(pattern))) + files_gridded = sorted(list((collocation_path / "gridded").glob(pattern))) + + times_on_swath = {} + for f_on_swath in files_on_swath: + time_str = f_on_swath.name.split("_")[2][:-3] + median_time = datetime.strptime(time_str, "%Y%m%d%H%M%S") + times_on_swath[median_time] = f_on_swath + + times_gridded = {} + for f_gridded in files_gridded: + time_str = f_gridded.name.split("_")[2][:-3] + median_time = datetime.strptime(time_str, "%Y%m%d%H%M%S") + times_gridded[median_time] = f_gridded + + combined = set(times_gridded.keys()).intersection(set(times_on_swath.keys())) + + LOGGER.info(f"Found {len(combined)} collocations in {collocation_path}.") + + if n_processes < 2: + for median_time in track( + combined, + description="Extracting ancillary data:", + console=speed.logging.get_console() + ): + try: + add_ancillary_data( + times_on_swath[median_time], + times_gridded[median_time], + era5_path=era5_path, + era5_lai_path=era5_lai_path, + gprof_ancillary_dir=gprof_ancillary_dir, + gprof_ingest_dir=gprof_ingest_dir + ) + except Exception: + LOGGER.exception( + "Processing of the collocation with median time %s failed " + "with the following error.", + median_time + ) + else: + pool = ProcessPoolExecutor( + max_workers=n_processes + ) + tasks = [] + for median_time in combined: + tasks.append( + add_ancillary_data, + times_on_swath[median_time], + times_gridded[median_time], + era5_path=era5_path, + era5_lai_path=era5_lai_path, + gprof_ancillary_dir=gprof_ancillary_dir, + gprof_ingest_dir=gprof_ingest_dir + ) + with Progress(console=speed.logging.get_console()) as progress: + extraction = progress.add_task("Extracting GOES observations:", total=len(tasks)) + for task in as_completed(tasks): + try: + task.result() + except Exception: + LOGGER.exception( + "The following error was encountered when processing collocation " + "with median time %s.", + median_time + ) + progress.advance(extraction, advance=1.0) diff --git a/speed/data/combined.py b/speed/data/combined.py index 4b18e81..6c414eb 100644 --- a/speed/data/combined.py +++ b/speed/data/combined.py @@ -11,6 +11,7 @@ from typing import List, Optional from tempfile import TemporaryDirectory +from filelock import FileLock from gprof_nn.data.l1c import L1CFile import numpy as np from pansat import Granule @@ -146,6 +147,7 @@ def load_reference_data( "precip_liq_water_cont", "cloud_ice_water_cont", "cloud_liq_water_cont", + "scan_time" ] cmb_data = granule.open()[cmb_vars].rename( { @@ -154,9 +156,7 @@ def load_reference_data( "precip_liq_water_cont": "rain_water_content", "cloud_ice_water_cont": "ice_water_content", "cloud_liq_water_cont": "liquid_water_content" - } - ) - + }) profiles = [ "total_water_content", "rain_water_content", @@ -178,22 +178,35 @@ def load_reference_data( cmb_data["ice_water_path"] = cmb_data["ice_water_content"].integrate("vertical_bins") cmb_data["liquid_water_path"] = cmb_data["liquid_water_content"].integrate("vertical_bins") + cmb_type = cmb_data["precipitation_type"].data.astype("int64") + precip_type = np.zeros_like(cmb_type, dtype="uint") + precip_type[cmb_type == -1111] = 1 + precip_type[cmb_type // 10_000_000 == 1] = 2 + precip_type[cmb_type // 10_000_000 == 2] = 3 + precip_type[cmb_type // 10_000_000 == 3] = 4 + + cmb_data["precipitation_type"] = ( + list(cmb_data["precipitation_type"].dims), + precip_type + ) target_levels = np.concatenate([0.25 * np.arange(20) + 0.25, 10.5 + np.arange(8)]) cmb_data = cmb_data.interp(vertical_bins=target_levels) - slh_recs = l2a_gpm_dpr_slh.get(granule.time_range) - if len(slh_recs) > 0: - scan_start, scan_end = granule.primary_index_range - slh_data = l2a_gpm_dpr_slh.open(slh_recs[0])[{"scans": slice(scan_start, scan_end)}] - slh_data = slh_data[["latent_heating"]].rename({ - "scans": "matched_scans", - "pixels": "matched_pixels" - }) - slh_data["latent_heating"].data[:] = slh_data.latent_heating.data[..., ::-1] - slh_data["vertical_bins"] = (("vertical_bins"), 0.125 + 0.25 * np.arange(80)) - slh = slh_data.latent_heating.interp(vertical_bins=target_levels) - else: - slh = xr.DataArray(np.zeros_like(swc.data), dims=("scans", "pixels", "vertical_bins")) + lock = FileLock("slh.lock") + with lock: + slh_recs = l2a_gpm_dpr_slh.get(granule.time_range) + if len(slh_recs) > 0: + scan_start, scan_end = granule.primary_index_range + slh_data = l2a_gpm_dpr_slh.open(slh_recs[0])[{"scans": slice(scan_start, scan_end)}] + slh_data = slh_data[["latent_heating"]].rename({ + "scans": "matched_scans", + "pixels": "matched_pixels" + }) + slh_data["latent_heating"].data[:] = slh_data.latent_heating.data + slh_data["vertical_bins"] = (("vertical_bins"), 0.125 + 0.25 * np.arange(80)) + slh = slh_data.latent_heating.interp(vertical_bins=target_levels) + else: + slh = xr.DataArray(np.zeros_like(swc.data), dims=("scans", "pixels", "vertical_bins")) cmb_data["latent_heating"] = slh results_cmb.append(cmb_data) @@ -232,9 +245,10 @@ def load_reference_data( ) grid = GLOBAL.grid[row_start: row_end, col_start:col_end] + cmb_data["time"], _ = xr.broadcast(cmb_data.scan_time, cmb_data.latitude) data_r = resample_data(cmb_data, grid, 5e3) - data_r.attrs["cmb_files"] = ",".join(str(cmb_filenames)) + data_r.attrs["input_files"] = ",".join(cmb_filenames) if self.include_mirs: @@ -254,7 +268,31 @@ def load_reference_data( data_r.attrs["lower_left_col"] = col_start data_r.attrs["lower_left_row"] = row_start - return data_r + float_encoding = {"dtype": "float32", "compression": "zstd"} + int_encoding = {"dtype": "int8", "compression": "zstd"} + + float_vars = [ + "surface_precip", + "total_water_content", + "rain_water_content", + "ice_water_content", + "liquid_water_content", + "snow_water_path", + "rain_water_path", + "ice_water_path", + "liquid_water_path", + "latent_heating" + ] + for var in float_vars: + data_r[var].encoding = float_encoding + data_r["precipitation_type"].encoding = int_encoding + + if beam_width is None: + return data_r, None + + raise RuntimeError( + "Calculation of footprint-averaged results not yet supported." + ) gpm_cmb = Combined("cmb", include_mirs=False) diff --git a/speed/data/goes.py b/speed/data/goes.py index 592eff2..99d380b 100644 --- a/speed/data/goes.py +++ b/speed/data/goes.py @@ -90,7 +90,7 @@ def add_goes_obs( path_gridded: Path, n_steps: int = 4, sector: str = "full" -): +) -> None: """ Add GOES observations for extracted collocations. diff --git a/speed/data/gpm.py b/speed/data/gpm.py index c4ab5a5..a842618 100644 --- a/speed/data/gpm.py +++ b/speed/data/gpm.py @@ -216,6 +216,7 @@ def process_match( match: Tuple[Granule, List[Granule]], reference_data: ReferenceData, output_folder: Path, + min_area: float = 0.0 ) -> None: """ Extract collocations from matched input and reference data. @@ -228,6 +229,8 @@ def process_match( reference data for the matched granules. output_folder: The base folder to which to write the extracted collocations in on_swath and gridded format. + min_area: A minimum area in square degree below which collocations + are discarded. """ inpt_granule, ref_granules = match ref_granules = list(ref_granules) @@ -263,12 +266,13 @@ def process_match( ref_geom = ref_granules[0].geometry.to_shapely() gpm_geom = inpt_granule.geometry.to_shapely() area = ref_geom.intersection(gpm_geom).area - if area < 25: + if area < min_area: LOGGER.info( "Discarding match between '%s' and '%s' because area " - " is less than 25 square degree.", + " is less than %s square degree.", inpt_granule.file_record.product.name, reference_data.pansat_product.name, + min_area ) return None @@ -337,7 +341,7 @@ def process_match( surface_precip = reference_data_r.surface_precip.data else: surface_precip = reference_data_r.surface_precip_combined.data - if np.isfinite(surface_precip).sum() < 50: + if np.isfinite(surface_precip).sum() < 1: LOGGER.info( "Skipping match because it contains only %s valid " " surface precip pixels.", @@ -395,6 +399,7 @@ def process_match( preprocessor_data_r, ref_data, output_folder, + min_size=256 ) def process_day( @@ -404,6 +409,7 @@ def process_day( day: int, reference_data: ReferenceData, output_folder: Path, + min_area: float = 5 ): """ Process all collocations available on a given day. @@ -416,6 +422,8 @@ def process_day( reference data source. output_folder: The folder to which to write the extracted collocations. + min_area: A minimum area in square degree below which collocations + are discarded. """ start_time = datetime(year, month, day) end_time = start_time + timedelta(days=1) diff --git a/speed/data/mrms.py b/speed/data/mrms.py index a72475f..38985bc 100644 --- a/speed/data/mrms.py +++ b/speed/data/mrms.py @@ -1,8 +1,8 @@ """ -spree.data.mrms +speed.data.mrms =============== -This module provides function to extract collocations with the NOAA +This module provides functionality to extract collocations with the NOAA Multi-Radar Multi-Sensor (MRMS) ground-based radar estimates. """ import logging diff --git a/speed/data/noaa.py b/speed/data/noaa.py index 3a50746..df90c23 100644 --- a/speed/data/noaa.py +++ b/speed/data/noaa.py @@ -11,6 +11,7 @@ from typing import List, Tuple +from filelock import FileLock import pansat from pansat import FileRecord, TimeRange, Granule from pansat.environment import get_index @@ -73,16 +74,43 @@ def load_brightness_temperatures( sc_lon = l1b_data.spacecraft_longitude.data sc_lat = l1b_data.spacecraft_latitude.data sc_alt = l1b_data.spacecraft_altitude.data - tbs = np.concatenate((tbs_s1, tbs_s2, tbs_s3), axis=-1) + scan_time = l1b_data.scan_time.data + + # Upsample fields + n_scans, n_pixels, n_channels = tbs.shape + + lons_hr = np.zeros_like(lons, shape=(2 * n_scans - 1, n_pixels)) + lons_hr[::2] = lons + lons_hr[1::2] = 0.5 * (lons[:-1] + lons[1:]) + lats_hr = np.zeros_like(lats, shape=(2 * n_scans - 1, n_pixels)) + lats_hr[::2] = lats + lats_hr[1::2] = 0.5 * (lats[:-1] + lats[1:]) + tbs_hr = np.zeros_like(tbs, shape=(2 * n_scans - 1, n_pixels, n_channels)) + tbs_hr[::2] = tbs + tbs_hr[1::2] = 0.5 * (tbs[:-1] + tbs[1:]) + scan_time_hr = np.zeros_like(scan_time, shape=(2 * n_scans - 1)) + scan_time_hr[::2] = scan_time + scan_time_hr[1::2] = scan_time[1:] + 0.5 * (scan_time[1:] - scan_time[:-1]) + + sc_lon_hr = np.zeros_like(sc_lon, shape=(2 * n_scans - 1)) + sc_lon_hr[::2] = sc_lon + sc_lon_hr[1::2] = 0.5 * (sc_lon[:-1] + sc_lon[1:]) + sc_lat_hr = np.zeros_like(sc_lat, shape=(2 * n_scans - 1)) + sc_lat_hr[::2] = sc_lat + sc_lat_hr[1::2] = 0.5 * (sc_lat[:-1] + sc_lat[1:]) + sc_alt_hr = np.zeros_like(sc_alt, shape=(2 * n_scans - 1)) + sc_alt_hr[::2] = sc_alt + sc_alt_hr[1::2] = 0.5 * (sc_alt[:-1] + sc_alt[1:]) return xr.Dataset({ - "latitude": (("scan", "pixel"), lats), - "longitude": (("scan", "pixel"), lons), - "tbs": (("scan", "pixel", "channels"), tbs), - "spacecraft_longitude": (("scan"), sc_lon), - "spacecraft_latitude": (("scan"), sc_lat), - "spacecraft_altitude": (("scan"), sc_alt), + "latitude": (("scan", "pixel"), lats_hr), + "longitude": (("scan", "pixel"), lons_hr), + "observations": (("scan", "pixel", "channels"), tbs_hr), + "spacecraft_longitude": (("scan"), sc_lon_hr), + "spacecraft_latitude": (("scan"), sc_lat_hr), + "spacecraft_altitude": (("scan"), sc_alt_hr), + "scan_time": (("scan"), scan_time_hr) }) @@ -206,6 +234,9 @@ def process_match( margin=64 ) + for var in reference_data_r: + reference_data_r[var].encoding = ref_data[var].encoding + inpt_data = inpt_data[{"scan": slice(scan_start, scan_end)}] reference_data_r = reference_data_r[{"scan": slice(scan_start, scan_end)}] inpt_data.attrs["scan_start"] = inpt_granule.primary_index_range[0] + scan_start @@ -311,10 +342,9 @@ def process_day( # Get all available files for given day. inpt_recs = product.get(time_range) inpt_index = Index.index(product, inpt_recs) - print(inpt_recs) LOGGER.info( - f"Found %s files for %s/%s/%s.", - len(inpt_recs), year, month, day + f"Found %s files for %s-%s-%s.", + len(inpt_recs), year, f"{month:02}", f"{day:02}" ) # If reference data has a fixed domain, subset index to files @@ -324,10 +354,12 @@ def process_day( # Collect available reference data. reference_recs = [] - for granule in inpt_index.granules: - reference_recs += reference_data.pansat_product.get( - time_range=granule.time_range - ) + lock = FileLock("noaa_ref.lock") + with lock: + for granule in inpt_index.granules: + reference_recs += reference_data.pansat_product.get( + time_range=granule.time_range + ) reference_index = Index.index(reference_data.pansat_product, reference_recs) # Calculate matches between input and reference data. diff --git a/speed/data/utils.py b/speed/data/utils.py index cd25ac5..acf7b8c 100644 --- a/speed/data/utils.py +++ b/speed/data/utils.py @@ -569,7 +569,12 @@ def save_input_data( "observations": {"dtype": "uint16", "_FillValue": uint16_max, "scale_factor": 0.01, "compression": "zstd"}, "earth_incidence_angle": {"dtype": "int16", "_FillValue": -(2e-15), "scale_factor": 0.01, "compression": "zstd"}, } - input_data = input_data[["observations", "earth_incidence_angle"]] + vars = [ + var for var in ["observations", "earth_incidence_angle"] + if var in input_data + ] + encoding = {var: encoding[var] for var in vars} + input_data = input_data[vars] input_data.to_netcdf(output_path / filename, encoding=encoding) @@ -584,7 +589,13 @@ def save_input_data( "hail_fraction", "convective_fraction", "stratiform_fraction", - "time" + "time", + "precipitation_type", + "rain_water_content", + "total_water_content", + "rain_water_path", + "snow_water_path", + "latent_heating" ] @@ -613,7 +624,9 @@ def save_target_data( output_path = path / "target" output_path.mkdir(exist_ok=True, parents=True) - target_variables = copy(TARGET_VARIABLES) + target_variables = [ + var for var in TARGET_VARIABLES if var in reference_data + ] if include_swath_coords: target_variables += [ "scan_index", @@ -630,8 +643,17 @@ def save_target_data( "hail_fraction": {"dtype": "uint8", "_FillValue": 255, "scale_factor": 1.0/254.0, "compression": "zstd"}, "convective_fraction": {"dtype": "uint8", "_FillValue": 255, "scale_factor": 1.0/254.0, "compression": "zstd"}, "stratiform_fraction": {"dtype": "uint8", "_FillValue": 255, "scale_factor": 1.0/254.0, "compression": "zstd"}, + "precipitation_type": {"dtype": "int8", "compression": "zstd"}, + "total_water_content": {"dtype": "float32", "compression": "zstd"}, + "rain_water_content": {"dtype": "float32", "compression": "zstd"}, + "total_water_content": {"dtype": "float32", "compression": "zstd"}, + "rain_water_path": {"dtype": "float32", "compression": "zstd"}, + "snow_water_path": {"dtype": "float32", "compression": "zstd"}, + "latent_heating": {"dtype": "float32"} } + target_data = reference_data[target_variables] + encoding = {var: encoding[var] for var in target_variables if var in encoding} target_data.to_netcdf(output_path / filename, encoding=encoding) @@ -796,7 +818,7 @@ def extract_scenes( time = scan_times[0] + np.median(scan_times - scan_times[0]) time = to_datetime(time) - save_ancillary_data(inpt, time, output_folder) + #save_ancillary_data(inpt, time, output_folder) save_input_data(sensor_name, inpt, time, output_folder) save_target_data(ref, time, output_folder) if geo_data is not None: @@ -849,8 +871,8 @@ def extract_evaluation_data( min_size: Scene below that size will be discarded. """ parts = collocation_file_on_swath.name.split("_") - sensor_name = parts[1] - time_str = parts[2][:-3] + sensor_name = parts[-2] + time_str = parts[-1][:-3] time = datetime.strptime(time_str, "%Y%m%d%H%M%S") @@ -1154,9 +1176,10 @@ def calculate_footprint_averages( sensor_altitudes, _ = xr.broadcast(sensor_altitudes, latitudes) sensor_dims = sensor_latitudes.dims[:2] + spatial_dims = data.latitude.ndim results = { var: ( - (sensor_dims + data[var].dims[:2])[:data[var].data.ndim], + (sensor_dims + data[var].dims[:2])[:2 + data[var].data.ndim - spatial_dims], np.nan * np.zeros(latitudes.shape + data[var].shape[2:], dtype=data[var].dtype) ) for var in data if var != "time" } @@ -1197,7 +1220,10 @@ def calculate_footprint_averages( lon_range = np.abs(lons_data - lon_p) < 0.5 * area_of_influence lat_range = np.abs(lats_data - lat_p) < 0.5 * area_of_influence - data_fp = data[{"longitude": lon_range, "latitude": lat_range}] + if "longitude" in data.dims: + data_fp = data[{"longitude": lon_range, "latitude": lat_range}] + else: + data_fp = data[{data.surface_precip.dims[0]: lon_range * lat_range}] lons_d = data_fp.longitude.data lats_d = data_fp.latitude.data diff --git a/speed/data/wegener_net.py b/speed/data/wegener_net.py new file mode 100644 index 0000000..5bb320e --- /dev/null +++ b/speed/data/wegener_net.py @@ -0,0 +1,144 @@ +""" +speed.data.wegener_net +====================== + +This module provides functionality to extract collocations with the WegenerNet +ground-station data. +""" +import logging +from typing import List, Optional, Union, Tuple + +import numpy as np +from pansat.products.stations.wegener_net import station_data, get_station_data +from pansat.geometry import LonLatRect +from pansat.granule import Granule +from scipy.stats import binned_statistic_2d + +from speed.data.reference import ReferenceData +from speed.data.utils import calculate_footprint_averages +from speed.grids import GLOBAL + +import xarray as xr + + +def get_domain() -> LonLatRect: + """ + Return a lat/lon bounding box defining the spatial coverage of the WegenerNet data. + """ + station_data = get_station_data() + lons = station_data.longitude.data + lats = station_data.latitude.data + lon_min, lon_max = lons.min(), lons.max() + lat_min, lat_max = lats.min(), lats.max() + return LonLatRect(lon_min, lat_min, lon_max, lat_max) + + +LOGGER = logging.getLogger(__name__) + + +class WegenerNet(ReferenceData): + """ + Reference data class for processing MRMS data. + + Combines MRMS precip rate, flag and radar quality index into + a DataArray. + """ + + def __init__(self): + super().__init__("wegener_net", get_domain(), station_data) + + def load_reference_data( + self, + input_granule: Granule, + granules: List[Granule], + radius_of_influence: float, + beam_width: float + ) -> Optional[xr.Dataset]: + """ + Load reference data for a given granule of input data. + + Args: + input_granule: The matched input data granules. + granules: A list containing the matched reference data granules. + radius_of_influence: The radius of influence to use for the resampling + of the scan times. + beam_width: The beam width to use for the calculation of footprint-averaged + precipitation. + + Return: + An xarray.Dataset containing the MRMS reference data. + """ + coords = input_granule.geometry.bounding_box_corners + lon_min, lat_min, lon_max, lat_max = coords + + wegener_data = [granule.open() for granule in granules] + wegener_data = xr.concat(wegener_data, dim="station") + + lons_g = GLOBAL.lons.copy() + lats_g = GLOBAL.lats.copy() + lons_g = lons_g + lats_g = lats_g + + lon_inds = (lons_g > lon_min) * (lons_g < lon_max) + lons_g = lons_g[lon_inds] + lat_inds = (lats_g > lat_min) * (lats_g < lat_max) + lats_g = lats_g[lat_inds] + lower_left_col = np.where(lon_inds)[0][0] + lower_left_row = np.where(lat_inds)[0][0] + + lon_bins = 0.5 * (lons_g[1:] + lons_g[:-1]) + lat_bins = 0.5 * (lats_g[1:] + lats_g[:-1]) + + input_data = input_granule.open() + scan_time = input_data.scan_time.data + start_time = np.nanmin(scan_time) + end_time = np.nanmax(scan_time) + time = start_time + 0.5 * (end_time - start_time) + + wegener_data = wegener_data.interp(time=time, method="nearest") + + surface_precip = binned_statistic_2d( + wegener_data.latitude.data, + wegener_data.longitude.data, + wegener_data.surface_precip.data, + bins=(lat_bins[::-1], lon_bins) + )[0][::-1] + + lons = lons_g[1:-1] + lats = lats_g[1:-1] + + time = np.broadcast_to(time[None, None], (lats.size, lons.size)) + + results = xr.Dataset({ + "longitude": (("longitude",), lons), + "latitude": (("latitude",), lats), + "surface_precip": (("latitude", "longitude"), surface_precip), + "time": (("latitude", "longitude"), time) + }) + # Account for shrinkage caused by grid-based resampling + results.attrs["lower_left_row"] = lower_left_row + 1 + results.attrs["lower_left_col"] = lower_left_col + 1 + + input_data = input_granule.open() + lats_fp = input_data.latitude if "latitude" in input_data else input_data.latitude_s1 + lons_fp = input_data.longitude if "longitude" in input_data else input_data.longitude_s1 + sensor_latitude = input_data.spacecraft_latitude + sensor_longitude = input_data.spacecraft_longitude + sensor_altitude = input_data.spacecraft_altitude + + results_fpavg = calculate_footprint_averages( + wegener_data, + lons_fp, + lats_fp, + sensor_longitude, + sensor_latitude, + sensor_altitude, + beam_width, + 0.5 + ) + + + return results, results_fpavg + + +wegener_net = WegenerNet() diff --git a/test/conftest.py b/test/conftest.py index 001b344..9e990ff 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -14,9 +14,11 @@ from pansat.products.ground_based import mrms, gpm_gv from pansat.products.satellite.gpm import ( l1c_r_gpm_gmi, + l1c_noaa20_atms, l1c_gcomw1_amsr2, l2b_gpm_cmb ) +from pansat.products.stations import wegener_net from speed.data.gpm import run_preprocessor @@ -80,8 +82,8 @@ def cmb_match(scope="session"): end_time = "2020-01-02T00:12:00" time_range = TimeRange(start_time, end_time) cmb_index = get_index(l2b_gpm_cmb).subset(time_range) - gmi_index = get_index(l1c_r_gpm_gmi).subset(time_range) - matches = find_matches(gmi_index, cmb_index) + atms_index = get_index(l1c_noaa20_atms).subset(time_range) + matches = find_matches(atms_index, cmb_index) return matches[0] @@ -130,3 +132,18 @@ def mhs_conus_granule(): def preprocessor_data(mhs_conus_granule): preprocessor_data = run_preprocessor(mhs_conus_granule) return preprocessor_data + + +@pytest.fixture +def wegener_net_match(scope="session"): + """ + Returns a tuple describing a matche between a GMI granule and Wegener net + data. + """ + start_time = "2023-07-02T00:00:00" + end_time = "2023-07-03T00:00:00" + time_range = TimeRange(start_time, end_time) + gmi_index = get_index(l1c_r_gpm_gmi).subset(time_range) + wn_index = get_index(wegener_net.station_data).subset(time_range) + matches = find_matches(gmi_index, wn_index) + return matches[0] diff --git a/test/data/test_ancillary.py b/test/data/test_ancillary.py new file mode 100644 index 0000000..17add38 --- /dev/null +++ b/test/data/test_ancillary.py @@ -0,0 +1,142 @@ +""" +Tests for the speed.data.ancillary module. +""" +from pathlib import Path +import pytest + +import numpy as np + +from speed.data.ancillary import ( + find_era5_sfc_files, + find_era5_lai_files, + load_era5_ancillary_data, + find_autosnow_file, + load_autosnow_data, + load_landmask_data, + load_emissivity_data, + load_gprof_surface_type_data +) + + +ERA5_SFC_PATH = Path("/qdata2/archive/ERA5") +ERA5_LAI_PATH = Path("/pdata4/pbrown/ERA5") +HAS_ERA5_DATA = ERA5_SFC_PATH.exists() and ERA5_LAI_PATH.exists() +NEEDS_ERA5_DATA = pytest.mark.skipif( + not HAS_ERA5_DATA, + reason="Needs ERA5 data." +) + +INGEST_DIR = Path("/qdata1/pbrown/gpm/ppingest") +HAS_INGEST_DIR = INGEST_DIR.exists() +NEEDS_INGEST_DATA = pytest.mark.skipif( + not HAS_INGEST_DIR, + reason="Needs preprocessor ingest dir." +) + +ANCILLARY_DIR = Path("/qdata1/pbrown/gpm/ppancillary") +HAS_ANCILLARY_DIR = ANCILLARY_DIR.exists() +NEEDS_ANCILLARY_DATA = pytest.mark.skipif( + not HAS_ANCILLARY_DIR, + reason="Needs preprocessor ancillary dir." +) + +@NEEDS_ERA5_DATA +def test_find_era5_sfc_files(): + """ + Test finding of ERA5 files. + """ + start_time = np.datetime64("2020-01-01T01:15:00") + end_time = np.datetime64("2020-01-01T01:45:00") + + era5_files = find_era5_sfc_files(ERA5_SFC_PATH, start_time, end_time) + assert len(era5_files) == 1 + + start_time = np.datetime64("2020-01-01T01:15:00") + end_time = np.datetime64("2020-01-01T02:45:00") + era5_files = find_era5_sfc_files(ERA5_SFC_PATH, start_time, end_time) + assert len(era5_files) == 1 + + start_time = np.datetime64("2019-12-31T23:45:00") + end_time = np.datetime64("2020-01-01T00:15:00") + era5_files = find_era5_sfc_files(ERA5_SFC_PATH, start_time, end_time) + assert len(era5_files) == 2 + + start_time = np.datetime64("2020-01-01T23:45:00") + end_time = np.datetime64("2020-01-02T00:15:00") + era5_files = find_era5_sfc_files(ERA5_SFC_PATH, start_time, end_time) + assert len(era5_files) == 2 + + +@NEEDS_ERA5_DATA +def test_find_era5_lai_files(): + """ + Test finding of ERA5 files. + """ + start_time = np.datetime64("2020-01-01T01:15:00") + end_time = np.datetime64("2020-01-01T01:45:00") + + era5_files = find_era5_lai_files(ERA5_LAI_PATH, start_time, end_time) + assert len(era5_files) == 1 + + start_time = np.datetime64("2020-01-01T01:15:00") + end_time = np.datetime64("2020-01-01T02:45:00") + era5_files = find_era5_lai_files(ERA5_LAI_PATH, start_time, end_time) + assert len(era5_files) == 1 + + start_time = np.datetime64("2019-12-31T23:45:00") + end_time = np.datetime64("2020-01-01T00:15:00") + era5_files = find_era5_lai_files(ERA5_LAI_PATH, start_time, end_time) + assert len(era5_files) == 2 + + start_time = np.datetime64("2020-01-01T23:45:00") + end_time = np.datetime64("2020-01-02T00:15:00") + era5_files = find_era5_lai_files(ERA5_LAI_PATH, start_time, end_time) + assert len(era5_files) == 2 + + +@NEEDS_ERA5_DATA +def test_load_era5_ancillary_data(): + """ + Test loading of ERA5 data. + """ + start_time = np.datetime64("2020-01-01T01:15:00") + end_time = np.datetime64("2020-01-01T01:45:00") + era5_sfc_files = find_era5_sfc_files(ERA5_SFC_PATH, start_time, end_time) + era5_lai_files = find_era5_lai_files(ERA5_LAI_PATH, start_time, end_time) + era5_data = load_era5_ancillary_data(era5_sfc_files, era5_lai_files) + + assert len(era5_data.variables) == 12 + + +@NEEDS_INGEST_DATA +def test_find_autosnow_file(): + date = np.datetime64("2020-01-01T01:15:00") + autosnow_file = find_autosnow_file(INGEST_DIR, date) + assert autosnow_file.exists() + + +@NEEDS_INGEST_DATA +def test_load_autosnow_file(): + date = np.datetime64("2020-01-01T01:15:00") + autosnow_file = find_autosnow_file(INGEST_DIR, date) + autosnow_data = load_autosnow_data(autosnow_file) + + +@NEEDS_ANCILLARY_DATA +def test_load_landmask_data(): + ancillary_data = load_landmask_data(ANCILLARY_DIR) + + +@NEEDS_ANCILLARY_DATA +def test_load_emissivity_data(): + ancillary_data = load_emissivity_data(ANCILLARY_DIR, np.datetime64("2020-01-01T00:00:00")) + + +@NEEDS_ANCILLARY_DATA +@NEEDS_INGEST_DATA +def test_load_gprof_surface_type_data(): + ancillary_data = load_gprof_surface_type_data( + ANCILLARY_DIR, + INGEST_DIR, + np.datetime64("2020-01-01T00:00:00") + ) diff --git a/test/data/test_combined.py b/test/data/test_combined.py index c06fe7b..91e2ba5 100644 --- a/test/data/test_combined.py +++ b/test/data/test_combined.py @@ -1,7 +1,6 @@ """ Tests for the speed.data.combined module. ========================================= - """ import os @@ -45,11 +44,11 @@ def test_load_reference_data(cmb_match): variables. """ input_granule, cmb_granules = cmb_match - ref_data = gpm_cmb.load_reference_data( + ref_data, _ = gpm_cmb.load_reference_data( input_granule, cmb_granules, - 5e3, - 5e3 + None, + None ) assert "surface_precip" in ref_data diff --git a/test/data/test_wegener_net.py b/test/data/test_wegener_net.py new file mode 100644 index 0000000..2f88a36 --- /dev/null +++ b/test/data/test_wegener_net.py @@ -0,0 +1,35 @@ +""" +Tests for the speed.data.combined module. +========================================= +""" +import os + +import pytest + +import numpy as np + +from speed.data.wegener_net import wegener_net + + +if "PANSAT_PASSWORD" in os.environ: + HAS_PANSAT_PASSWORD = True +else: + HAS_PANSAT_PASSWORD = False + + +@pytest.mark.skipif(not HAS_PANSAT_PASSWORD, reason="pansat password not set.") +def test_load_reference_data(wegener_net_match): + """ + Test extraction of reference data from WegenerNet data and ensure that + the precipitation field contains valid pixels. + """ + input_granule, wn_granules = wegener_net_match + ref_data, _ = wegener_net.load_reference_data( + input_granule, + wn_granules, + 5e3, + 0.98 + ) + + assert "surface_precip" in ref_data + assert np.any(np.isfinite(ref_data.surface_precip))