Skip to content

Commit

Permalink
Improved graph analysis and graph optimizer
Browse files Browse the repository at this point in the history
- added colorbar to edge importance and task view modes
- changed edge importance calculation to show the maximum (absolute) task length increase rather than the average task length increase.
- changed default size of edges to LEGO 1x4 brick dimensions (3.2x0.8 instead of 4x1)
- locations should now be sorted alphabetically in task names and internal variables.
  • Loading branch information
simulatedScience committed Aug 29, 2023
1 parent 10d1a25 commit f5aef90
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 37 deletions.
51 changes: 46 additions & 5 deletions coding/ttr_map_maker/graph_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,10 @@ def get_random_shortest_task_paths_edge_counts(self, n_random_paths: int = 1000)
connection_index = self.get_shortest_connection_index(loc1, loc2)
edge = (loc1, loc2, connection_index)
if edge in task_edge_counts:
task_edge_counts[edge] += 1
task_edge_counts[edge] += 1/n_random_paths
else:
task_edge_counts[edge] = 1
task_edge_counts[edge] = 1/n_random_paths

return task_edge_counts

def get_shortest_connection_index(self, loc1: str, loc2: str):
Expand Down Expand Up @@ -267,23 +268,63 @@ def get_node_importance(self) -> dict[str, float]:
node_importance[node] = self.get_average_task_length(new_graph) - average_task_length
return node_importance

# def get_edge_importance(self) -> dict[Tuple[str, str, int], float]:
# """
# Calculate the importance of each edge in the graph: how much the average task length is increased if that edge is removed.

# Returns:
# dict[Tuple[str, str, int], float]: dictionary of edges and the avergae increase in task length if that edge is removed
# The edge is represented as a tuple of the two locations and the connection index.
# """
# edge_importance: dict[Tuple[str, str, int], float] = {}
# average_task_length = self.get_average_task_length()
# for edge in self.networkx_graph.edges:
# # copy graph and remove edge
# new_graph = self.networkx_graph.copy()
# new_graph.remove_edge(*edge)
# connection_index = self.networkx_graph.edges[edge]['key']
# edge_importance[(edge[0], edge[1], connection_index)] = self.get_average_task_length(new_graph) - average_task_length
# return edge_importance

def get_edge_importance(self) -> dict[Tuple[str, str, int], float]:
"""
Calculate the importance of each edge in the graph: how much the average task length is increased if that edge is removed.
Calculate the importance of each edge in the graph: how much a task length is increased at most if that edge is removed.
Returns:
dict[Tuple[str, str, int], float]: dictionary of edges and the avergae increase in task length if that edge is removed
The edge is represented as a tuple of the two locations and the connection index.
"""
edge_importance: dict[Tuple[str, str, int], float] = {}
average_task_length = self.get_average_task_length()
original_task_lengths = self.get_task_lengths()
for edge in self.networkx_graph.edges:
# copy graph and remove edge
new_graph = self.networkx_graph.copy()
new_graph.remove_edge(*edge)
connection_index = self.networkx_graph.edges[edge]['key']
edge_importance[(edge[0], edge[1], connection_index)] = self.get_average_task_length(new_graph) - average_task_length
task_length_increase = self.get_maximum_task_length_increase(new_graph, original_task_lengths)
edge_importance[(edge[0], edge[1], connection_index)] = task_length_increase
return edge_importance

def get_maximum_task_length_increase(self, new_graph, original_task_lengths) -> float:
"""
Calculate the maximum task length increase caused by removing an edge from the graph.
Args:
graph: The graph from which the edge will be removed.
Returns:
float: The maximum task length increase.
"""
new_task_lengths = self.get_task_lengths(new_graph)

max_increase = 0
for task_id, original_task_length in original_task_lengths.items():
new_task_length = new_task_lengths[task_id]
increase = new_task_length - original_task_length
if increase > max_increase:
max_increase = increase

return max_increase

def get_average_task_length(self, graph: nx.Graph = None) -> float:
"""
Expand Down
2 changes: 1 addition & 1 deletion coding/ttr_map_maker/graph_optimizer_gui.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def run_smulation_frame(self):
"""
if not self.simulation_running:
return
print("running simulation frame")
print(f"running {self.iterations_per_frame.get()} simulation frame(s)")
self.particle_graph.optimize_layout(
iterations = self.iterations_per_frame.get(),
dt = self.time_step.get())
Expand Down
22 changes: 13 additions & 9 deletions coding/ttr_map_maker/graph_particle.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def __init__(self,
self.rotation = rotation # rotation of particle in radians
self.angular_velocity = 0
self.angular_acceleration = 0
self.inertia = mass * (bounding_box_size[0] ** 2 + bounding_box_size[1] ** 2) / 12 # moment of inertia
self.inertia: float = mass * (bounding_box_size[0] ** 2 + bounding_box_size[1] ** 2) / 12 # moment of inertia

self.repulsion_strength = repulsion_strength
self.velocity_decay = velocity_decay # factor by which the particle's velocity is multiplied each time step. This is used to simulate friction.
Expand Down Expand Up @@ -249,11 +249,13 @@ def interact(self, other: "Graph_Particle") -> Tuple[np.ndarray, float]:
attraction_force, attraction_anchor = self.get_attraction_forces(other)
attraction_force_radial, attraction_torque = split_force(attraction_force, attraction_anchor, self.position)
else:
attraction_force = np.zeros(2)
attraction_force_radial = np.zeros(2)
attraction_torque = 0
# split attraction force into translation and rotation components
# add forces to acceleration
self.acceleration += (self.repulsion_strength * repulsion_force_radial + attraction_force_radial) / self.mass
self.acceleration += (self.repulsion_strength * repulsion_force + attraction_force) / self.mass
# self.acceleration += (self.repulsion_strength * repulsion_force_radial + attraction_force_radial) / self.mass
self.angular_acceleration += (self.repulsion_strength * repulsion_torque + attraction_torque) / self.inertia


Expand All @@ -270,13 +272,15 @@ def get_repulsion_forces(self, other: "Graph_Particle") -> Tuple[np.ndarray, np.
(np.ndarray) translation repulsion force to apply to `self` particle
(np.ndarray) rotation repulsion force to apply to `self` particle
"""
if self.repulsion_strength == 0 or other.repulsion_strength == 0:
return np.zeros(2), np.zeros(2)
# get overlap
self_polygon = self.get_bounding_box_polygon()
other_polygon = other.get_bounding_box_polygon()
overlap_center, overlap_area = get_box_overlap(self_polygon, other_polygon)

if not overlap_area > 0: # no overlap => no repulsion force
return np.zeros(2), 0
return np.zeros(2), np.zeros(2)

overlap_vector = overlap_center - self.position # vector from self center to center of overlap area
translation_force = -overlap_vector * overlap_area
Expand Down Expand Up @@ -505,14 +509,14 @@ def to_dict(self) -> dict:
"particle_type": self.__class__.__name__,
"id": self.id,
"position": [float(coord) for coord in self.position],
"rotation": self.rotation,
"mass": self.mass,
"rotation": float(self.rotation),
"mass": float(self.mass),
"bounding_box_size": list(self.bounding_box_size),
"velocity_decay": self.velocity_decay,
"angular_velocity_decay": self.angular_velocity_decay,
"interaction_radius": self.interaction_radius,
"velocity_decay": float(self.velocity_decay),
"angular_velocity_decay": float(self.angular_velocity_decay),
"interaction_radius": float(self.interaction_radius),
"connected_particles": [p.get_id() for p in self.connected_particles],
"repulsion_strength": self.repulsion_strength,
"repulsion_strength": float(self.repulsion_strength),
}
if self.target_position is not None:
particle_info["target_position"] = [float(coord) for coord in self.target_position]
Expand Down
6 changes: 3 additions & 3 deletions coding/ttr_map_maker/particle_edge.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(self,
position: np.ndarray = np.array([0, 0], dtype=np.float16),
rotation: float = 0,
mass: float = 0.1,
bounding_box_size: tuple = (4, 1),
bounding_box_size: tuple = (3.2, 0.8),
border_color: str = "#555555",
node_attraction: float = 0.1,
edge_attraction: float = 0.1,
Expand Down Expand Up @@ -375,10 +375,10 @@ def get_image_rotation(self) -> float:
visited_particle_ids = {self.get_id()}
connected_nodes = [self, self]
for i in range(2):
while True:
while True: # find a connected node
connected_index = 0
new_node = connected_nodes[i].connected_particles[connected_index]
while True:
while True: # find a connected node that has not been visited yet
if not new_node.get_id() in visited_particle_ids:
break
connected_index += 1
Expand Down
8 changes: 5 additions & 3 deletions coding/ttr_map_maker/particle_label.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ def __init__(self,
interaction_radius: float = 5,
velocity_decay: float = 0.9999,
angular_velocity_decay: float = 0.9999,
repulsion_strength: float = 1,
repulsion_strength: float = 0,
fontsize: int = 150,
font_name: str = None,
font_path: str = "beleriand_ttr\\MiddleEarth.ttf",
# font_path: str = "beleriand_ttr\\MiddleEarth.ttf",
font_path: str = "assets\\fonts\\HARRYP__.ttf",
height_scale_factor: float = None):
"""
Initialize a particle label
Expand All @@ -54,7 +55,8 @@ def __init__(self,
self.height_scale_factor = Particle_Label.get_label_height_scale()
else:
self.height_scale_factor = height_scale_factor
self.inside_stroke_width = fontsize // 25
# self.inside_stroke_width = fontsize // 25
self.inside_stroke_width = fontsize // 75
self.outline_stroke_width = fontsize // 8
if font_name is None:
# font_name = font_path.split("\\")[-1].strip(".ttf")
Expand Down
Loading

0 comments on commit f5aef90

Please sign in to comment.