-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathCAstarMover.py
276 lines (243 loc) · 13.5 KB
/
CAstarMover.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
import networkx as nx
from numpy import Inf
class CAstarMover:
"""
The fallback method for handling agent collisions
"""
def __init__(self, mapCanvas, mapGraph, sharedInfoManager, agentManager, taskManager, simCanvasRef):
self.mapCanvas = mapCanvas
self.mapGraph = mapGraph
self.sharedInfoManager = sharedInfoManager
self.agentManager = agentManager
self.taskManager = taskManager
self.simCanvasRef = simCanvasRef
# Collision manager uses a list of submitted moves, arriving in order of priority
self.agentPriorityList = []
self.agentMotionDict = {}
def getCurrentPriorityOrder(self):
return self.agentPriorityList
def setCurrentPriorityOrder(self, newPriorityList):
self.agentPriorityList = newPriorityList
def resetCurrentPriorityOrder(self):
self.agentPriorityList = []
def submitAgentAction(self, agent, desiredMove):
if agent.numID not in self.agentPriorityList:
self.agentPriorityList.append(agent.numID)
self.agentMotionDict[agent.numID] = desiredMove
def checkAgentCollisions(self):
# print("CHECKING AGENT COLLISIONS . . .")
# print(self.agentMotionDict)
# Check for conflicts
vertexDict, edgeDict = self.comprehendAgentMotions()
hasConflict = self.checkForConflicts(vertexDict, edgeDict)
if hasConflict:
conflictCount = 1
return {"agents": hasConflict}
else:
conflictCount = 0
# while hasConflict:
# # If there is a conflict, cycle the resolver until there isn't
# vertexDict, edgeDict = self.comprehendAgentMotions()
# hasConflict = self.checkForConflicts(vertexDict, edgeDict)
# print(self.agentMotionDict)
# print(">>>Conflict resolved, executing moves.")
# After resolving conflicts, execute all the queued up movements
for agent in self.agentPriorityList:
currentAgent = self.agentManager.agentList[agent]
self.simCanvasRef.requestRender("agent", "move", {"agentNumID": currentAgent.numID,
"sourceNodeID": currentAgent.currentNode, "targetNodeID": self.agentMotionDict[agent][1]})
currentAgent.executeMove(self.agentMotionDict[agent][1])
currentAgent.pathfinder.agentTookStep()
# Reset the agent motion queue
self.agentMotionDict = {}
return conflictCount
def comprehendAgentMotions(self):
vertexDict = {}
edgeDict = {}
# print(self.agentPriorityList)
# Decompose the list of agent desired actions into vertex and edge plan lists
for agentID, agentMove in self.agentMotionDict.items():
# self.sharedInfoManager.showReservationsByAgent(agentID)
# print(f"{agentID}: {agentMove}")
# Crashed agents need priority and a replan
if agentMove == "crash":
# Agent is completely blocked in, unable to avoid collision via reservation table
# Examine neighbors to find the ideal movement for the agent
currentAgent = self.agentManager.agentList[agentID]
currentNode = currentAgent.currentNode
neighbors = list(self.mapGraph.neighbors(currentNode)) + [currentNode]
targetNode = currentAgent.returnTargetNode()
# Default distance is infinite, such that any other distance is superior
winnerDist = Inf
for neighborNode in neighbors:
abstractDist = currentAgent.pathfinder.heuristicFunc(currentNode, targetNode)
if abstractDist < winnerDist:
winner = neighborNode
winnerDist = abstractDist
agentMove = (currentNode, winner)
# print(f"Agent crashed—prefers: {agentMove}")
# Crashed units get top priority
self.agentPriorityList.pop(self.agentPriorityList.index(agentID))
self.agentPriorityList = [agentID] + self.agentPriorityList
# Submit the new plan to the relevant trackers
self.sharedInfoManager.handlePathPlanRequest([currentNode, winner], agentID)
self.agentMotionDict[agentID] = agentMove
# Vertex reservations are the destination node, in the next timestep
if agentMove[1] in vertexDict:
vertexDict[agentMove[1]].append(agentID)
else:
vertexDict[agentMove[1]] = [agentID]
# Edges can be safely sorted such that A->B = A<-B
edgeTuple = tuple(sorted(agentMove))
if edgeTuple in edgeDict:
edgeDict[edgeTuple].append(agentID)
else:
edgeDict[edgeTuple] = [agentID]
return (vertexDict, edgeDict)
def checkForConflicts(self, vertexDict, edgeDict):
# print(f"V:{vertexDict}")
# print(f"E:{edgeDict}")
# Resolution preference is given to edge conflicts
for edge, agents in edgeDict.items():
if len(agents) > 1:
self.conflictFound = True
# print("EDGE CONFLICT")
self.resolveEdgeConflict(agents[0], agents[1])
# print(f"New motions: {self.agentMotionDict}")
return agents
for node, agents in vertexDict.items():
if len(agents) > 1:
self.conflictFound = True
# print("VERTEX CONFLICT")
self.resolveNodeConflict(agents)
# print(f"New motions: {self.agentMotionDict}")
return agents
return False
def resolveEdgeConflict(self, agentOne, agentTwo):
# Priority driven; agent with higher priority (lower index) takes precedence
agentList = [agentOne, agentTwo]
for agentID in self.agentPriorityList:
if agentID in agentList:
# Found highest priority agent
agentList.pop(agentList.index(agentID))
break
# print(f"\tCollision losers are: {agentList}")
# Lower priority agents must replan around the locked-in higher priority agents
for deprioAgent in agentList:
# Find the object reference
deprioAgentData = self.agentManager.agentList[deprioAgent]
# Reset intended plan, then seek an open space to move away from the conflicts
# Highest priority indicates the center of the collision knot, from which agents should disperse
self.agentManager.agentList[deprioAgent].pathfinder.__reset__()
deprioAgentNeighbors = self.findValidNeighbors(deprioAgentData)
# print(f"Free neighbors: {deprioAgentNeighbors}")
if len(deprioAgentNeighbors) == 0:
# There's no free places to move, so check if there are valid overwrites
self.agentManager.agentList[deprioAgentData.numID].pathfinder.__reset__()
deprioAgentNeighbors = self.findLowerPriorityNeighbors(deprioAgentData)
# print(f"Overwritable neighbors: {deprioAgentNeighbors}")
# If there are no valid overwrites, the agent has literally zero options with this priority
if len(deprioAgentNeighbors) == 0:
# print(f"Crashing...")
# If there are no valid free moves, and no valid overwrites, this agent needs highest priority
self.agentMotionDict[deprioAgent] = "crash"
else:
# Otherwise, choose an overwrite and implement it
choice = deprioAgentNeighbors[0]
plannedPath = [deprioAgentData.currentNode, choice]
self.agentMotionDict[deprioAgentData.numID] = plannedPath
# Submit the new plan to the relevant trackers
deprioAgentData.pathfinder.plannedPath = plannedPath
self.sharedInfoManager.handlePathPlanRequest(plannedPath, deprioAgentData.numID)
else:
# Otherwise choose an overwrite and implement it
choice = deprioAgentNeighbors[0]
plannedPath = [deprioAgentData.currentNode, choice]
self.agentMotionDict[deprioAgentData.numID] = plannedPath
# Submit the new plan to the relevant trackers
deprioAgentData.pathfinder.plannedPath = plannedPath
self.sharedInfoManager.handlePathPlanRequest(plannedPath, deprioAgentData.numID)
# def updateConflictingEdgePlans(self, prioAgent, deprioAgent):
# # Priority agent sets its desired path
# plannedPath = list(self.agentMotionDict[prioAgent.numID])
# prioAgent.pathfinder.plannedPath = plannedPath
# self.sharedInfoManager.handlePathPlanRequest(plannedPath, prioAgent.numID)
# # Deprio agent needs a new plan, selecting an immediate (free) neighbor
# deprioAgentNeighbors = self.findValidNeighbors(deprioAgent)
# choice = deprioAgentNeighbors[0]
# plannedPath = [deprioAgent.currentNode, choice]
# self.agentMotionDict[deprioAgent.numID] = plannedPath
# # Set the deprioritized agent path
# deprioAgent.pathfinder.plannedPath = plannedPath
# self.sharedInfoManager.handlePathPlanRequest(plannedPath, deprioAgent.numID)
# # Swap priorities in the priority list if necessary
# priorityAgentOldPrio = self.agentPriorityList.index(prioAgent.numID)
# depriorityAgentOldPrio = self.agentPriorityList.index(deprioAgent.numID)
# if priorityAgentOldPrio > depriorityAgentOldPrio:
# self.agentPriorityList[depriorityAgentOldPrio] = prioAgent.numID
# self.agentPriorityList[priorityAgentOldPrio] = deprioAgent.numID
def resolveNodeConflict(self, agentList):
# print(f"Conflicting agents: {agentList}")
# Node conflicts resolve via priority order
for agentID in self.agentPriorityList:
if agentID in agentList:
# Found highest priority agent
plannedPath = list(self.agentMotionDict[agentID])
deprioAgent = agentList.pop(agentList.index(agentID))
deprioAgent = self.agentManager.agentList[deprioAgent]
break
# Remaining agents just try to wait in place
agentList.reverse()
for agent in agentList:
# print(f"{agent} experiencing forced wait due to priority")
# self.agentMustWait(agent)
prioAgent = self.agentManager.agentList[agent]
# Priority order needs shuffling in order to tie break and avoid deadlock
self.swapAgentPriority(prioAgent, deprioAgent)
def agentMustWait(self, agentID):
# Agent will not move
agent = self.agentManager.agentList[agentID]
plannedPath = [agent.currentNode, agent.currentNode]
# Release any prior path reservations
agent.pathfinder.__reset__()
# Submit the new path to the relevant trackers
agent.pathfinder.plannedPath = plannedPath
self.agentMotionDict[agent.numID] = tuple(plannedPath)
self.sharedInfoManager.handlePathPlanRequest(plannedPath, agent.numID)
def swapAgentPriority(self, prioAgent, deprioAgent):
# Exchange the position in the priority list of the two passed agentIDs
priorityAgentOldPrio = self.agentPriorityList.index(prioAgent.numID)
depriorityAgentOldPrio = self.agentPriorityList.index(deprioAgent.numID)
if priorityAgentOldPrio > depriorityAgentOldPrio:
self.agentPriorityList[depriorityAgentOldPrio] = prioAgent.numID
self.agentPriorityList[priorityAgentOldPrio] = deprioAgent.numID
def findValidNeighbors(self, agentData):
# print("Seeking free neighbors . . .")
# Determine which neighboring nodes are available as-is
validNeighbors = []
neighbors = list(self.mapGraph.neighbors(agentData.currentNode))
# Checking neighbors
for i, neighbor in enumerate(neighbors):
valid = self.sharedInfoManager.evaluateNodeEligibility(0, neighbor, agentData.currentNode, agentData.numID)
if valid:
validNeighbors.append(neighbor)
# Checking own tile
if self.sharedInfoManager.evaluateNodeEligibility(0, agentData.currentNode, agentData.currentNode, agentData.numID):
validNeighbors.append(agentData.currentNode)
return validNeighbors
def findLowerPriorityNeighbors(self, agentData):
# print(f"Seeking overwritable neighbors from {agentData.currentNode}")
# print(self.agentPriorityList)
# Determine which neighboring nodes are possible to overwrite
validNeighbors = []
neighbors = list(self.mapGraph.neighbors(agentData.currentNode))
# If no valid free neighbors were found, then the agent needs to be willing to overwrite an agent's plan
for i, neighbor in enumerate(neighbors):
valid = self.sharedInfoManager.evaluateNodeOverwritability(0, neighbor, agentData.currentNode, agentData.numID, self.agentPriorityList)
if valid:
validNeighbors.append(neighbor)
# Checking own tile
if self.sharedInfoManager.evaluateNodeOverwritability(0, agentData.currentNode, agentData.currentNode, agentData.numID, self.agentPriorityList):
validNeighbors.append(agentData.currentNode)
# print(f"Agent needs to overwrite lower priority plans: {validNeighbors}")
return validNeighbors