Skip to content

Commit

Permalink
Merge pull request projectmesa#924 from projectmesa/NewBatchRunner
Browse files Browse the repository at this point in the history
Re-Implementation of BatchRunner
  • Loading branch information
jackiekazil authored Dec 19, 2021
2 parents 0c23349 + f9ada45 commit ea7bcb3
Show file tree
Hide file tree
Showing 3 changed files with 333 additions and 12 deletions.
31 changes: 21 additions & 10 deletions examples/bank_reserves/batch_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from bank_reserves.agents import Bank, Person
import itertools
from mesa import Model
from mesa.batchrunner import BatchRunner
from mesa.batchrunner import batch_run
from mesa.space import MultiGrid
from mesa.datacollection import DataCollector
from mesa.time import RandomActivation
Expand Down Expand Up @@ -148,7 +148,7 @@ def __init__(
"Model Params": track_params,
"Run": track_run,
},
agent_reporters={"Wealth": lambda x: x.wealth},
agent_reporters={"Wealth": "wealth"},
)

# create a single bank for the model
Expand Down Expand Up @@ -181,25 +181,36 @@ def run_model(self):

# parameter lists for each parameter to be tested in batch run
br_params = {
"init_people": [25, 100, 150, 200],
"rich_threshold": [5, 10, 15, 20],
"reserve_percent": [0, 50, 100],
"init_people": [25, 100],
"rich_threshold": [5, 10],
"reserve_percent": 5,
}

br = BatchRunner(
"""br = BatchRunner(
BankReservesModel,
br_params,
iterations=1,
iterations=2,
max_steps=1000,
model_reporters={"Data Collector": lambda m: m.datacollector},
)
nr_processes=None,
# model_reporters={"Data Collector": lambda m: m.datacollector},
)"""

if __name__ == "__main__":
br.run_all()
data = batch_run(
BankReservesModel,
br_params,
model_reporters={"Rich": get_num_rich_agents},
agent_reporters={"Wealth": "wealth"},
)
br_df = pd.DataFrame(data)
br_df.to_csv("BankReservesModel_Data.csv")
# br.run_all()
"""
br_df = br.get_model_vars_dataframe()
br_step_data = pd.DataFrame()
for i in range(len(br_df["Data Collector"])):
if isinstance(br_df["Data Collector"][i], DataCollector):
i_run_data = br_df["Data Collector"][i].get_model_vars_dataframe()
br_step_data = br_step_data.append(i_run_data, ignore_index=True)
br_step_data.to_csv("BankReservesModel_Step_Data.csv")
"""
189 changes: 187 additions & 2 deletions mesa/batchrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,197 @@
"""
import copy
import itertools
import random
from itertools import product, count
from collections import OrderedDict
from functools import partial
from itertools import count, product
from multiprocessing import Pool, cpu_count
from typing import (
Any,
Counter,
Dict,
Iterable,
List,
Mapping,
Optional,
Tuple,
Type,
Union,
)

import pandas as pd
from tqdm import tqdm
from collections import OrderedDict

from mesa.model import Model


def batch_run(
model_cls: Type[Model],
parameters: Mapping[str, Union[Any, Iterable[Any]]],
nr_processes: Optional[int] = None,
iterations: int = 1,
i_steps: int = -1,
max_steps: int = 1000,
display_progress: bool = True,
) -> List[Dict[str, Any]]:
"""Batch run a mesa model with a set of parameter values.
Parameters
----------
model_cls : Type[Model]
The model class to batch-run
parameters : Mapping[str, Union[Any, Iterable[Any]]],
Dictionary with model parameters over which to run the model. You can either pass single values or iterables.
nr_processes : int, optional
Number of processes used. Set to None (default) to use all available processors
iterations : int, optional
Number of iterations for each parameter combination, by default 1
max_steps : int, optional
Maximum number of model steps after which the model halts, by default 1000
display_progress : bool, optional
Display batch run process, by default True
Returns
-------
List[Dict[str, Any]]
[description]
"""

kwargs_list = _make_model_kwargs(parameters) * iterations
process_func = partial(
_model_run_func,
model_cls,
max_steps=max_steps,
i_steps=i_steps,
)

total_iterations = len(kwargs_list) * iterations
run_counter = count()

results: List[Dict[str, Any]] = []

with tqdm(total_iterations, disable=not display_progress) as pbar:
if nr_processes == 1:
for iteration in range(iterations):
for kwargs in kwargs_list:
_, rawdata = process_func(kwargs)
run_id = next(run_counter)
data = []
for run_data in rawdata:
out = {"RunId": run_id, "iteration": iteration - 1}
out.update(run_data)
data.append(out)
results.extend(data)
pbar.update()

else:
iteration_counter: Counter[Tuple[Any, ...]] = Counter()
with Pool(nr_processes) as p:
for paramValues, rawdata in p.imap_unordered(process_func, kwargs_list):
iteration_counter[paramValues] += 1
iteration = iteration_counter[paramValues]
run_id = next(run_counter)
data = []
for run_data in rawdata:
out = {"RunId": run_id, "iteration": iteration - 1}
out.update(run_data)
data.append(out)
results.extend(data)
pbar.update()

return results


def _make_model_kwargs(
parameters: Mapping[str, Union[Any, Iterable[Any]]]
) -> List[Dict[str, Any]]:
"""Create model kwargs from parameters dictionary.
Parameters
----------
parameters : Mapping[str, Union[Any, Iterable[Any]]]
Single or multiple values for each model parameter name
Returns
-------
List[Dict[str, Any]]
A list of all kwargs combinations.
"""
parameter_list = []
for param, values in parameters.items():
try:
all_values = [(param, value) for value in values]
except TypeError:
all_values = [(param, values)]
parameter_list.append(all_values)
all_kwargs = itertools.product(*parameter_list)
kwargs_list = [dict(kwargs) for kwargs in all_kwargs]
return kwargs_list


def _model_run_func(
model_cls: Type[Model],
kwargs: Dict[str, Any],
max_steps: int,
i_steps: int,
) -> Tuple[Tuple[Any, ...], List[Dict[str, Any]]]:
"""Run a single model run and collect model and agent data.
Parameters
----------
model_cls : Type[Model]
The model class to batch-run
kwargs : Dict[str, Any]
model kwargs used for this run
max_steps : int
Maximum number of model steps after which the model halts, by default 1000
i_steps : int
Collect data every ith step
Returns
-------
Tuple[Tuple[Any, ...], List[Dict[str, Any]]]
Return model_data, agent_data from the reporters
"""
model = model_cls(**kwargs)
while model.running and model.schedule.steps <= max_steps:
model.step()

data = []

steps = list(range(0, model.schedule.steps, i_steps))
if not steps or steps[-1] != model.schedule.steps - 1:
steps.append(model.schedule.steps - 1)

for step in steps:
model_data, all_agents_data = _collect_data(model, step)

stepdata = [
{**{"Step": step}, **kwargs, **model_data, **agent_data}
for agent_data in all_agents_data
]
data.extend(stepdata)

return tuple(kwargs.values()), data


def _collect_data(
model: Model,
step: int,
) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
"""Collect model and agent data from a model using mesas datacollector."""
dc = model.datacollector

model_data = {param: values[step] for param, values in dc.model_vars.items()}

all_agents_data = []
raw_agent_data = dc._agent_records.get(step, [])
for data in raw_agent_data:
agent_dict = {"AgentID": data[1]}
agent_dict.update(zip(dc.agent_reporters, data[2:]))
all_agents_data.append(agent_dict)
return model_data, all_agents_data


class ParameterError(TypeError):
Expand Down
125 changes: 125 additions & 0 deletions tests/test_batch_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from mesa.agent import Agent
from mesa.batchrunner import _make_model_kwargs, batch_run
from mesa.datacollection import DataCollector
from mesa.model import Model
from mesa.time import BaseScheduler


def test_make_model_kwargs():
assert _make_model_kwargs({"a": 3, "b": 5}) == [{"a": 3, "b": 5}]
assert _make_model_kwargs({"a": 3, "b": range(3)}) == [
{"a": 3, "b": 0},
{"a": 3, "b": 1},
{"a": 3, "b": 2},
]
assert _make_model_kwargs({"a": range(2), "b": range(2)}) == [
{"a": 0, "b": 0},
{"a": 0, "b": 1},
{"a": 1, "b": 0},
{"a": 1, "b": 1},
]


class MockAgent(Agent):
"""
Minimalistic agent implementation for testing purposes
"""

def __init__(self, unique_id, model, val):
super().__init__(unique_id, model)
self.unique_id = unique_id
self.val = val
self.local = 0

def step(self):
self.val += 1
self.local += 0.25


class MockModel(Model):
"""
Minimalistic model for testing purposes
"""

def __init__(
self,
variable_model_param=None,
variable_agent_param=None,
fixed_model_param=None,
schedule=None,
**kwargs
):
super().__init__()
self.schedule = BaseScheduler(self) if schedule is None else schedule
self.variable_model_param = variable_model_param
self.variable_agent_param = variable_agent_param
self.fixed_model_param = fixed_model_param
self.n_agents = 3
self.datacollector = DataCollector(
model_reporters={"reported_model_param": self.get_local_model_param},
agent_reporters={"agent_id": "unique_id", "agent_local": "local"},
)
self.running = True
self.init_agents()

def init_agents(self):
if self.variable_agent_param is None:
agent_val = 1
else:
agent_val = self.variable_agent_param
for i in range(self.n_agents):
self.schedule.add(MockAgent(i, self, agent_val))

def get_local_model_param(self):
return 42

def step(self):
self.datacollector.collect(self)
self.schedule.step()


def test_batch_run():
result = batch_run(MockModel, {})
assert result == [
{
"RunId": 0,
"iteration": 0,
"Step": 1000,
"reported_model_param": 42,
"AgentID": 0,
"agent_id": 0,
"agent_local": 250.0,
},
{
"RunId": 0,
"iteration": 0,
"Step": 1000,
"reported_model_param": 42,
"AgentID": 1,
"agent_id": 1,
"agent_local": 250.0,
},
{
"RunId": 0,
"iteration": 0,
"Step": 1000,
"reported_model_param": 42,
"AgentID": 2,
"agent_id": 2,
"agent_local": 250.0,
},
]


def test_batch_run_with_params():
batch_run(
MockModel,
{
"variable_model_params": range(5),
"variable_agent_params": ["H", "E", "L", "L", "O"],
},
)


def test_batch_run_single_core():
batch_run(MockModel, {}, nr_processes=1, iterations=10)

0 comments on commit ea7bcb3

Please sign in to comment.