forked from GlacioHack/xdem
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_affine.py
302 lines (235 loc) · 12.5 KB
/
test_affine.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
"""Functions to test the affine coregistrations."""
from __future__ import annotations
import copy
import geopandas as gpd
import numpy as np
import pytest
import rasterio as rio
from geoutils import Raster, Vector
from geoutils.raster import RasterType
import xdem
from xdem import coreg, examples
from xdem.coreg.affine import AffineCoreg, CoregDict
def load_examples() -> tuple[RasterType, RasterType, Vector]:
"""Load example files to try coregistration methods with."""
reference_raster = Raster(examples.get_path("longyearbyen_ref_dem"))
to_be_aligned_raster = Raster(examples.get_path("longyearbyen_tba_dem"))
glacier_mask = Vector(examples.get_path("longyearbyen_glacier_outlines"))
return reference_raster, to_be_aligned_raster, glacier_mask
class TestAffineCoreg:
ref, tba, outlines = load_examples() # Load example reference, to-be-aligned and mask.
inlier_mask = ~outlines.create_mask(ref)
fit_params = dict(
reference_elev=ref.data,
to_be_aligned_elev=tba.data,
inlier_mask=inlier_mask,
transform=ref.transform,
crs=ref.crs,
verbose=False,
)
# Create some 3D coordinates with Z coordinates being 0 to try the apply functions.
points_arr = np.array([[1, 2, 3, 4], [1, 2, 3, 4], [0, 0, 0, 0]], dtype="float64").T
points = gpd.GeoDataFrame(
geometry=gpd.points_from_xy(x=points_arr[:, 0], y=points_arr[:, 1], crs=ref.crs), data={"z": points_arr[:, 2]}
)
def test_from_classmethods(self) -> None:
# Check that the from_matrix function works as expected.
vshift = 5
matrix = np.diag(np.ones(4, dtype=float))
matrix[2, 3] = vshift
coreg_obj = AffineCoreg.from_matrix(matrix)
transformed_points = coreg_obj.apply(self.points)
assert all(transformed_points["z"].values == vshift)
# Check that the from_translation function works as expected.
x_offset = 5
coreg_obj2 = AffineCoreg.from_translation(x_off=x_offset)
transformed_points2 = coreg_obj2.apply(self.points)
assert np.array_equal(self.points.geometry.x.values + x_offset, transformed_points2.geometry.x.values)
# Try to make a Coreg object from a nan translation (should fail).
try:
AffineCoreg.from_translation(np.nan)
except ValueError as exception:
if "non-finite values" not in str(exception):
raise exception
def test_vertical_shift(self) -> None:
# Create a vertical shift correction instance
vshiftcorr = coreg.VerticalShift()
# Fit the vertical shift model to the data
vshiftcorr.fit(**self.fit_params)
# Check that a vertical shift was found.
assert vshiftcorr._meta.get("vshift") is not None
assert vshiftcorr._meta["vshift"] != 0.0
# Copy the vertical shift to see if it changes in the test (it shouldn't)
vshift = copy.copy(vshiftcorr._meta["vshift"])
# Check that the to_matrix function works as it should
matrix = vshiftcorr.to_matrix()
assert matrix[2, 3] == vshift, matrix
# Check that the first z coordinate is now the vertical shift
assert all(vshiftcorr.apply(self.points)["z"].values == vshiftcorr._meta["vshift"])
# Apply the model to correct the DEM
tba_unshifted, _ = vshiftcorr.apply(self.tba.data, transform=self.ref.transform, crs=self.ref.crs)
# Create a new vertical shift correction model
vshiftcorr2 = coreg.VerticalShift()
# Check that this is indeed a new object
assert vshiftcorr is not vshiftcorr2
# Fit the corrected DEM to see if the vertical shift will be close to or at zero
vshiftcorr2.fit(
reference_elev=self.ref.data,
to_be_aligned_elev=tba_unshifted,
transform=self.ref.transform,
crs=self.ref.crs,
inlier_mask=self.inlier_mask,
)
# Test the vertical shift
newmeta: CoregDict = vshiftcorr2._meta
new_vshift = newmeta["vshift"]
assert np.abs(new_vshift) < 0.01
# Check that the original model's vertical shift has not changed
# (that the _meta dicts are two different objects)
assert vshiftcorr._meta["vshift"] == vshift
def test_all_nans(self) -> None:
"""Check that the coregistration approaches fail gracefully when given only nans."""
dem1 = np.ones((50, 50), dtype=float)
dem2 = dem1.copy() + np.nan
affine = rio.transform.from_origin(0, 0, 1, 1)
crs = rio.crs.CRS.from_epsg(4326)
vshiftcorr = coreg.VerticalShift()
icp = coreg.ICP()
pytest.raises(ValueError, vshiftcorr.fit, dem1, dem2, transform=affine)
pytest.raises(ValueError, icp.fit, dem1, dem2, transform=affine)
dem2[[3, 20, 40], [2, 21, 41]] = 1.2
vshiftcorr.fit(dem1, dem2, transform=affine, crs=crs)
pytest.raises(ValueError, icp.fit, dem1, dem2, transform=affine)
def test_coreg_example(self, verbose: bool = False) -> None:
"""
Test the co-registration outputs performed on the example are always the same. This overlaps with the test in
test_examples.py, but helps identify from where differences arise.
"""
# Run co-registration
nuth_kaab = xdem.coreg.NuthKaab()
nuth_kaab.fit(self.ref, self.tba, inlier_mask=self.inlier_mask, verbose=verbose, random_state=42)
# Check the output metadata is always the same
shifts = (nuth_kaab._meta["offset_east_px"], nuth_kaab._meta["offset_north_px"], nuth_kaab._meta["vshift"])
assert shifts == pytest.approx((-0.463, -0.133, -1.9876264671765433))
def test_gradientdescending(self, subsample: int = 10000, inlier_mask: bool = True, verbose: bool = False) -> None:
"""
Test the co-registration outputs performed on the example are always the same. This overlaps with the test in
test_examples.py, but helps identify from where differences arise.
It also implicitly tests the z_name kwarg and whether a geometry column can be provided instead of E/N cols.
"""
if inlier_mask:
inlier_mask = self.inlier_mask
# Run co-registration
gds = xdem.coreg.GradientDescending(subsample=subsample)
gds.fit(
self.ref.to_pointcloud(data_column_name="z").ds,
self.tba,
inlier_mask=inlier_mask,
verbose=verbose,
random_state=42,
)
shifts = (gds._meta["offset_east_px"], gds._meta["offset_north_px"], gds._meta["vshift"])
assert shifts == pytest.approx((0.03525, -0.59775, -2.39144), abs=10e-5)
@pytest.mark.parametrize("shift_px", [(1, 1), (2, 2)]) # type: ignore
@pytest.mark.parametrize("coreg_class", [coreg.NuthKaab, coreg.GradientDescending, coreg.ICP]) # type: ignore
@pytest.mark.parametrize("points_or_raster", ["raster", "points"])
def test_coreg_example_shift(self, shift_px, coreg_class, points_or_raster, verbose=False, subsample=5000):
"""
For comparison of coreg algorithms:
Shift a ref_dem on purpose, e.g. shift_px = (1,1), and then applying coreg to shift it back.
"""
res = self.ref.res[0]
# shift DEM by shift_px
shifted_ref = self.ref.copy()
shifted_ref.shift(shift_px[0] * res, shift_px[1] * res, inplace=True)
shifted_ref_points = shifted_ref.to_pointcloud(
subsample=subsample, force_pixel_offset="center", random_state=42
).ds
shifted_ref_points["E"] = shifted_ref_points.geometry.x
shifted_ref_points["N"] = shifted_ref_points.geometry.y
shifted_ref_points.rename(columns={"b1": "z"}, inplace=True)
kwargs = {} if coreg_class.__name__ != "GradientDescending" else {"subsample": subsample}
coreg_obj = coreg_class(**kwargs)
best_east_diff = 1e5
best_north_diff = 1e5
if points_or_raster == "raster":
coreg_obj.fit(shifted_ref, self.ref, verbose=verbose, random_state=42)
elif points_or_raster == "points":
coreg_obj.fit(shifted_ref_points, self.ref, verbose=verbose, random_state=42)
if coreg_class.__name__ == "ICP":
matrix = coreg_obj.to_matrix()
# The ICP fit only creates a matrix and doesn't normally show the alignment in pixels
# Since the test is formed to validate pixel shifts, these calls extract the approximate pixel shift
# from the matrix (it's not perfect since rotation/scale can change it).
coreg_obj._meta["offset_east_px"] = -matrix[0][3] / res
coreg_obj._meta["offset_north_px"] = -matrix[1][3] / res
# ICP can never be expected to be much better than 1px on structured data, as its implementation often finds a
# minimum between two grid points. This is clearly warned for in the documentation.
precision = 1e-2 if coreg_class.__name__ != "ICP" else 1
if coreg_obj._meta["offset_east_px"] == pytest.approx(-shift_px[0], rel=precision) and coreg_obj._meta[
"offset_north_px"
] == pytest.approx(-shift_px[0], rel=precision):
return
best_east_diff = coreg_obj._meta["offset_east_px"] - shift_px[0]
best_north_diff = coreg_obj._meta["offset_north_px"] - shift_px[1]
raise AssertionError(f"Diffs are too big. east: {best_east_diff:.2f} px, north: {best_north_diff:.2f} px")
def test_nuth_kaab(self) -> None:
nuth_kaab = coreg.NuthKaab(max_iterations=10)
# Synthesize a shifted and vertically offset DEM
pixel_shift = 2
vshift = 5
shifted_dem = self.ref.data.squeeze().copy()
shifted_dem[:, pixel_shift:] = shifted_dem[:, :-pixel_shift]
shifted_dem[:, :pixel_shift] = np.nan
shifted_dem += vshift
# Fit the synthesized shifted DEM to the original
nuth_kaab.fit(
self.ref.data.squeeze(),
shifted_dem,
transform=self.ref.transform,
crs=self.ref.crs,
verbose=self.fit_params["verbose"],
)
# Make sure that the estimated offsets are similar to what was synthesized.
assert nuth_kaab._meta["offset_east_px"] == pytest.approx(pixel_shift, abs=0.03)
assert nuth_kaab._meta["offset_north_px"] == pytest.approx(0, abs=0.03)
assert nuth_kaab._meta["vshift"] == pytest.approx(-vshift, 0.03)
# Apply the estimated shift to "revert the DEM" to its original state.
unshifted_dem, _ = nuth_kaab.apply(shifted_dem, transform=self.ref.transform, crs=self.ref.crs)
# Measure the difference (should be more or less zero)
diff = self.ref.data.squeeze() - unshifted_dem
diff = diff.compressed() # turn into a 1D array with only unmasked values
# Check that the median is very close to zero
assert np.abs(np.median(diff)) < 0.01
# Check that the RMSE is low
assert np.sqrt(np.mean(np.square(diff))) < 1
# Transform some arbitrary points.
transformed_points = nuth_kaab.apply(self.points)
# Check that the x shift is close to the pixel_shift * image resolution
assert all(
abs((transformed_points.geometry.x.values - self.points.geometry.x.values) - pixel_shift * self.ref.res[0])
< 0.1
)
# Check that the z shift is close to the original vertical shift.
assert all(abs((transformed_points["z"].values - self.points["z"].values) + vshift) < 0.1)
def test_tilt(self) -> None:
# Try a 1st degree deramping.
tilt = coreg.Tilt()
# Fit the data
tilt.fit(**self.fit_params, random_state=42)
# Apply the deramping to a DEM
tilted_dem = tilt.apply(self.tba)
# Get the periglacial offset after deramping
periglacial_offset = (self.ref - tilted_dem)[self.inlier_mask]
# Get the periglacial offset before deramping
pre_offset = (self.ref - self.tba)[self.inlier_mask]
# Check that the error improved
assert np.abs(np.mean(periglacial_offset)) < np.abs(np.mean(pre_offset))
# Check that the mean periglacial offset is low
assert np.abs(np.mean(periglacial_offset)) < 0.02
def test_icp_opencv(self) -> None:
# Do a fast and dirty 3 iteration ICP just to make sure it doesn't error out.
icp = coreg.ICP(max_iterations=3)
icp.fit(**self.fit_params)
aligned_dem, _ = icp.apply(self.tba.data, transform=self.ref.transform, crs=self.ref.crs)
assert aligned_dem.shape == self.ref.data.squeeze().shape