-
Notifications
You must be signed in to change notification settings - Fork 121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Run FMU in parallel and avoid initialization for unscented Kalman Filter? #565
Comments
If the FMU supports it, you can get the FMU state after the initialization and then set it before every iteration (see also https://github.com/CATIA-Systems/FMPy/blob/main/tests/test_fmu_state.py). |
Sure, that works in the example I stated above as well. The problem is that for every function call of "simulate_fmu()", I instantiate the fmu every time. I haven't found a way to give an already instantiated FMU as an input to the function, because, if I understand correctly, the FMU can't be pickled. Is there a workaround? |
It sounds like you want to create your Why don't you use the Dymola Python interface, set the start values of all states as initial equations with the values of the current operating point and linearize the system with the |
Dear @m-kormann, I am not sure I understand your suggestion. For the extended Kalman filter, this makes sense. But for the UKF, where you need to simulate the FMU at every time step for multiple times, it should be advantageous to parallelize the simulation at each time point. I don't understand your suggestion exactly, could you elaborate a little? |
You can pass an FMU instance to |
@t-sommer I see that, but when creating a list of tuples with the necessary inputs for the function and trying to use the starmap function of multiprocessing.Pool(), it still says that the function can't be pickled. |
Can you share some code and an FMU to reproduce the problem? |
Here a modified version of this example that I was hopeful would work """ Parallel example modified
"""
import dask
from dask import bag, array
from dask.diagnostics import ProgressBar
import numpy as np
import fmpy
from fmpy.fmi2 import FMU2Slave
from fmpy import read_model_description, platform
from fmpy.util import download_test_file
import shutil
fmu_filename = 'Rectifier.fmu'
# define the parameter space for the variation
v_ac = [100, 150, 200, 250, 300, 350, 400, 450, 500] # AC voltage [V]
i_dc = [20, 30, 40, 50, 60, 70, 80, 90, 100] # DC current [A]
sync = False # synchronized execution (for debugging)
stop_time = 0.1
dll_handle = None
n_chunks = 10 # number of chunks to divide the workload
# create the parameters
V_AC, I_DC = np.meshgrid(v_ac, i_dc, indexing='ij')
def simulate_fmu(*args):
""" Worker function that simulates the FMU
Parameters:
args = [indices, fmu_args, start_vrs, result_vrs]
indices a list of indices into the parameter arrays
fmu_args FMU constructor arguments
Returns:
a list of tuples (i, [u_dc, losses]) that contain the index 'i' and the averaged results of the
'uDC' and 'Losses' variables
"""
indices, fmu, state_before, start_vrs, result_vrs = args
zipped = []
# global fmu_args, start_vrs, result_vrs, dll_handle
# iterate over the all indices in this batch
for i in indices:
fmu.setFMUstate(state_before)
# get the start values for the current index
start_values = [V_AC[i], I_DC[i]]
# set the start values
fmu.setReal(vr=start_vrs, value=start_values)
time = 0.0
step_size = 1e-4
results = []
# simulation loop
while time < stop_time:
fmu.doStep(currentCommunicationPoint=time, communicationStepSize=step_size)
time += step_size
if time > 0.02:
result = fmu.getReal(result_vrs)
results.append(result)
u_dc, losses = zip(*results)
# store the index and the averaged signals
zipped.append((i, [np.average(u_dc), np.average(losses)]))
fmu.freeFMUstate(state_before)
fmu.terminate()
# call the FMI API directly to avoid unloading the share library
fmu.fmi2FreeInstance(fmu.component)
if sync:
# remember the shared library handle so we can unload it later
global dll_handle
dll_handle = fmu.dll._handle
else:
# unload the shared library directly
fmpy.freeLibrary(fmu.dll._handle)
return zipped
def run_experiment(show_plot=True):
if platform not in ['win32', 'win64']:
raise Exception("Rectifier.fmu is only available for Windows")
print("Parameter variation on %s:" % fmu_filename)
print(" VAC", v_ac)
print(" IDC", i_dc)
if sync:
dask.config.set(scheduler='synchronous') # synchronized scheduler
# download the FMU
download_test_file('2.0', 'CoSimulation', 'Dymola', '2017', 'Rectifier', fmu_filename)
# read the model description
model_description = read_model_description(fmu_filename)
# collect the value references for the variables to read / write
vrs = {}
for variable in model_description.modelVariables:
vrs[variable.name] = variable.valueReference
# extract the FMU
unzipdir = fmpy.extract(fmu_filename)
fmu_args = {'guid': model_description.guid,
'modelIdentifier': model_description.coSimulation.modelIdentifier,
'unzipDirectory': unzipdir}
# get the value references for the start and output values
start_vrs = [vrs['VAC'], vrs['IDC']]
result_vrs = [vrs['uDC'], vrs['Losses']]
indices = list(np.ndindex(I_DC.shape))
fmu = FMU2Slave(**fmu_args)
fmu.instantiate()
fmu.setupExperiment()
fmu.enterInitializationMode()
fmu.exitInitializationMode()
state_before = fmu.getFMUstate()
print("Running %d simulations (%d chunks)..." % (V_AC.size, n_chunks))
with ProgressBar():
# calculate the losses for every chunk
b = bag.from_sequence(indices, npartitions=n_chunks)
results = b.map_partitions(simulate_fmu, fmu, state_before, start_vrs, result_vrs).compute()
print(results)
LOSSES = np.zeros_like(V_AC)
# put the results together
for i, res in results:
LOSSES[i] = res[1]
print(LOSSES)
# unload the shared library
if sync:
while True:
try:
fmpy.freeLibrary(dll_handle)
except:
break
# clean up
shutil.rmtree(unzipdir, ignore_errors=True)
if show_plot:
print("Plotting results...")
import matplotlib.pyplot as plt
figure = plt.figure()
figure.patch.set_facecolor('white')
ax = figure.add_subplot(1, 1, 1)
CS = plt.contourf(V_AC, I_DC, LOSSES, 10)
plt.colorbar(CS, aspect=30)
CS = ax.contour(V_AC, I_DC, LOSSES, 10, colors='k', linewidths=0.8)
ax.clabel(CS=CS, fmt='%.0f', fontsize=9, inline=1)
ax.set_title('Losses / W')
ax.set_xlabel('AC Voltage / V')
ax.set_ylabel('DC Current / A')
plt.show()
else:
print("Plotting disabled")
print("Done.")
return LOSSES
if __name__ == '__main__':
run_experiment() |
And here is another example, this time trying to use the recommended simulate_fmu() function: import numpy as np
import fmpy
from fmpy import read_model_description, extract, simulate_fmu, instantiate_fmu
from fmpy.fmi2 import FMU2Slave
from fmpy.util import plot_result, download_test_file
import os
from numba import njit, jit
import multiprocessing as mp
n_fmu = 2
fmu_filename = 'Rectifier.fmu'
x0 = [100,20]
input_names = ['VAC','IDC']
download_test_file('2.0', 'CoSimulation', 'Dymola', '2017', 'Rectifier', fmu_filename)
model_description = read_model_description(fmu_filename)
# Creating input dict
input_dict = {}
for i in range(len(input_names)):
input_dict[input_names[i]] = x0[i]
# extract the FMU
unzipdir = fmpy.extract(fmu_filename)
"""fmu_args = {'guid': model_description.guid,
'modelIdentifier': model_description.coSimulation.modelIdentifier,
'unzipDirectory': unzipdir}"""
fmu_instances = []
state_before = []
for i in range(n_fmu):
name = f"instance{i}"
"""fmu_instances.append(FMU2Slave(guid=model_description.guid, \
unzipDirectory=unzipdir, \
modelIdentifier=model_description.coSimulation.modelIdentifier, \
instanceName=name))"""
fmu_instances.append(instantiate_fmu(unzipdir=unzipdir, model_description=model_description))
fmu_instances[i].setupExperiment()
fmu_instances[i].enterInitializationMode()
fmu_instances[i].exitInitializationMode()
state_before.append(fmu_instances[i].getFMUstate())
inputs1 = (unzipdir, False, None, 1.0, 'CVode', None, None, \
None, \
True, \
None, \
input_dict, \
False, \
None, \
None, \
None, \
False, \
False, \
None, \
None, \
None, \
model_description, \
fmu_instances[0], \
False, \
'auto', \
False, \
False, \
False, \
True, \
state_before[0])
inputs2 = (unzipdir, False, None, 1.0, 'CVode', None, None, \
None, \
True, \
None, \
input_dict, \
False, \
None, \
None, \
None, \
False, \
False, \
None, \
None, \
None, \
model_description, \
fmu_instances[1], \
False, \
'auto', \
False, \
False, \
False, \
True, \
state_before[1])
input_tuple_list = [inputs1, inputs2]
if __name__ == '__main__':
#mp.freeze_support()
pool = mp.Pool()
results = pool.starmap(simulate_fmu, input_tuple_list) |
You should be able to adapt this example for your use case. It stores the FMU instance as an attribute of the function to be executed, so there is exactly one instance per process that does not need to be pickled. |
Interesting. I have a very similar application where I need to linearize a model for a Kalman Filter. Using the library pyfmi, I can do this using the function get_state_space_representation. I am transcribing my code into fmpy. Is there an easy way to do the same? @brazilbruno, could you tell me more about your application? |
I have tried looking for an answer, but haven't found anything, so if this question has already been asked, I apologize.
I am trying to run the UKF on a Co-Simulation FMU exported from Dymola. The algorithm is starting to become a little slow, since there are many states "n", and therefore "2n+1" simulations need to be done for every time-step. I want to run the simulations in parallel, but this is my first time trying parallelization, so I apologize. I saw an example parameter_variation.py, but in this example, for every run the FMU is initialized. Is it possible to perform this with an already initialized FMU? Maybe by initializing "2n+1" FMUs beforehand and passing it to the function?
Thanks for the help.
The text was updated successfully, but these errors were encountered: