forked from PythonOT/POT
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_weak.py
54 lines (37 loc) · 1.22 KB
/
test_weak.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
"""Tests for main module ot.weak """
# Author: Remi Flamary <[email protected]>
#
# License: MIT License
import ot
import numpy as np
def test_weak_ot():
# test weak ot solver and identity stationary point
n = 50
rng = np.random.RandomState(0)
xs = rng.randn(n, 2)
xt = rng.randn(n, 2)
u = ot.utils.unif(n)
G, log = ot.weak_optimal_transport(xs, xt, u, u, log=True)
# check constraints
np.testing.assert_allclose(u, G.sum(1))
np.testing.assert_allclose(u, G.sum(0))
# chaeck that identity is recovered
G = ot.weak_optimal_transport(xs, xs, G0=np.eye(n) / n)
# check G is identity
np.testing.assert_allclose(G, np.eye(n) / n)
# check constraints
np.testing.assert_allclose(u, G.sum(1))
np.testing.assert_allclose(u, G.sum(0))
def test_weak_ot_bakends(nx):
# test weak ot solver for different backends
n = 50
rng = np.random.RandomState(0)
xs = rng.randn(n, 2)
xt = rng.randn(n, 2)
u = ot.utils.unif(n)
G = ot.weak_optimal_transport(xs, xt, u, u)
xs2 = nx.from_numpy(xs)
xt2 = nx.from_numpy(xt)
u2 = nx.from_numpy(u)
G2 = ot.weak_optimal_transport(xs2, xt2, u2, u2)
np.testing.assert_allclose(nx.to_numpy(G2), G)