forked from open-atmos/PyPartMC
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_run_part.py
164 lines (145 loc) · 5.87 KB
/
test_run_part.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
####################################################################################################
# This file is a part of PyPartMC licensed under the GNU General Public License v3 (LICENSE file) #
# Copyright (C) 2022 University of Illinois Urbana-Champaign #
# Authors: https://github.com/open-atmos/PyPartMC/graphs/contributors #
####################################################################################################
import platform
import numpy as np
import pytest
import PyPartMC as ppmc
from .test_aero_data import AERO_DATA_CTOR_ARG_FULL, AERO_DATA_CTOR_ARG_MINIMAL
from .test_aero_dist import AERO_DIST_CTOR_ARG_FULL
from .test_aero_state import AERO_STATE_CTOR_ARG_MINIMAL
from .test_env_state import ENV_STATE_CTOR_ARG_HIGH_RH, ENV_STATE_CTOR_ARG_MINIMAL
from .test_gas_data import GAS_DATA_CTOR_ARG_MINIMAL
from .test_run_part_opt import RUN_PART_OPT_CTOR_ARG_SIMULATION
from .test_scenario import SCENARIO_CTOR_ARG_MINIMAL
@pytest.fixture(name="common_args")
def common_args_fixture(tmp_path):
aero_data = ppmc.AeroData(AERO_DATA_CTOR_ARG_MINIMAL)
gas_data = ppmc.GasData(GAS_DATA_CTOR_ARG_MINIMAL)
gas_state = ppmc.GasState(gas_data)
scenario = ppmc.Scenario(gas_data, aero_data, SCENARIO_CTOR_ARG_MINIMAL)
env_state = ppmc.EnvState(ENV_STATE_CTOR_ARG_MINIMAL)
scenario.init_env_state(env_state, 0.0)
filename = tmp_path / "test"
run_part_opt = ppmc.RunPartOpt(
{**RUN_PART_OPT_CTOR_ARG_SIMULATION, "output_prefix": str(filename)}
)
return (
scenario,
env_state,
aero_data,
ppmc.AeroState(aero_data, *AERO_STATE_CTOR_ARG_MINIMAL),
gas_data,
gas_state,
run_part_opt,
ppmc.CampCore(),
ppmc.Photolysis(),
)
class TestRunPart:
@staticmethod
def test_run_part(common_args):
ppmc.run_part(*common_args)
assert common_args[1].elapsed_time == RUN_PART_OPT_CTOR_ARG_SIMULATION["t_max"]
@staticmethod
def test_run_part_timestep(common_args):
last_output_time, last_progress_time, i_output = ppmc.run_part_timestep(
*common_args, 1, 0, 0, 0, 1
)
assert common_args[1].elapsed_time == RUN_PART_OPT_CTOR_ARG_SIMULATION["del_t"]
assert last_output_time == 0.0
assert last_progress_time == 0.0
assert i_output == 1
@staticmethod
def test_run_part_timeblock(common_args):
# arrange
num_times = int(
RUN_PART_OPT_CTOR_ARG_SIMULATION["t_output"]
/ RUN_PART_OPT_CTOR_ARG_SIMULATION["del_t"]
)
# act
last_output_time, last_progress_time, i_output = ppmc.run_part_timeblock(
*common_args, 1, num_times, 0, 0, 0, 1
)
# assert
assert last_output_time == RUN_PART_OPT_CTOR_ARG_SIMULATION["t_output"]
assert last_progress_time == 0.0
assert i_output == 2
@staticmethod
def test_run_part_do_condensation(common_args, tmp_path):
filename = tmp_path / "test"
env_state = ppmc.EnvState(ENV_STATE_CTOR_ARG_HIGH_RH)
aero_data = ppmc.AeroData(AERO_DATA_CTOR_ARG_FULL)
aero_dist = ppmc.AeroDist(aero_data, AERO_DIST_CTOR_ARG_FULL)
aero_state = ppmc.AeroState(aero_data, *AERO_STATE_CTOR_ARG_MINIMAL)
args = list(common_args)
args[0].init_env_state(env_state, 0.0)
args[1] = env_state
args[2] = aero_data
args[3] = aero_state
args[6] = ppmc.RunPartOpt(
{
**RUN_PART_OPT_CTOR_ARG_SIMULATION,
"output_prefix": str(filename),
"do_condensation": True,
}
)
aero_state.dist_sample(aero_dist, 1.0, 0.0, False, False)
ppmc.condense_equilib_particles(env_state, aero_data, aero_state)
ppmc.run_part(*args)
assert np.sum(aero_state.masses(include=["H2O"])) > 0.0
@staticmethod
@pytest.mark.parametrize(
"flags",
(
((True, True), (True, False)),
((True, True), (False, True)),
((True, True), (False, False)),
((False, False), (True, False)),
((False, False), (False, True)),
((False, False), (True, True)),
((True, False), (False, False)),
((True, False), (False, True)),
((False, True), (False, False)),
((False, True), (True, False)),
),
)
@pytest.mark.parametrize(
"fun_args",
(
("run_part", []),
("run_part_timestep", [0, 0, 0, 0, 0]),
("run_part_timeblock", [0, 0, 0, 0, 0, 0]),
),
)
@pytest.mark.skipif(platform.machine() == "arm64", reason="TODO #348")
def test_run_part_allow_flag_mismatch(common_args, tmp_path, fun_args, flags):
# arrange
filename = tmp_path / "test"
env_state = ppmc.EnvState(ENV_STATE_CTOR_ARG_HIGH_RH)
aero_data = ppmc.AeroData(AERO_DATA_CTOR_ARG_FULL)
aero_dist = ppmc.AeroDist(aero_data, AERO_DIST_CTOR_ARG_FULL)
aero_state = ppmc.AeroState(aero_data, *AERO_STATE_CTOR_ARG_MINIMAL)
args = list(common_args)
args[0].init_env_state(env_state, 0.0)
args[1] = env_state
args[2] = aero_data
args[3] = aero_state
args[6] = ppmc.RunPartOpt(
{
**RUN_PART_OPT_CTOR_ARG_SIMULATION,
"output_prefix": str(filename),
"allow_doubling": flags[0][0],
"allow_halving": flags[0][1],
}
)
aero_state.dist_sample(aero_dist, 1.0, 0.0, flags[1][0], flags[1][1])
# act
with pytest.raises(RuntimeError) as excinfo:
getattr(ppmc, fun_args[0])(*args, *fun_args[1])
# assert
assert (
str(excinfo.value)
== "allow halving/doubling flags set differently then while sampling"
)