Skip to content

Commit

Permalink
support graphnav navigate-to
Browse files Browse the repository at this point in the history
  • Loading branch information
k-okada committed Dec 21, 2020
1 parent f02879e commit fb553b9
Show file tree
Hide file tree
Showing 7 changed files with 554 additions and 2 deletions.
111 changes: 111 additions & 0 deletions spot_driver/scripts/graph_nav_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Copyright (c) 2020 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

"""Graph nav utility functions"""


def id_to_short_code(id):
"""Convert a unique id to a 2 letter short code."""
tokens = id.split('-')
if len(tokens) > 2:
return '%c%c' % (tokens[0][0], tokens[1][0])
return None


def pretty_print_waypoints(waypoint_id, waypoint_name, short_code_to_count, localization_id):
short_code = id_to_short_code(waypoint_id)
if short_code is None or short_code_to_count[short_code] != 1:
short_code = ' ' # If the short code is not valid/unique, don't show it.

print("%s Waypoint name: %s id: %s short code: %s" %
('->' if localization_id == waypoint_id else ' ',
waypoint_name, waypoint_id, short_code))


def find_unique_waypoint_id(short_code, graph, name_to_id):
"""Convert either a 2 letter short code or an annotation name into the associated unique id."""
if len(short_code) != 2:
# Not a short code, check if it is an annotation name (instead of the waypoint id).
if short_code in name_to_id:
# Short code is an waypoint's annotation name. Check if it is paired with a unique waypoint id.
if name_to_id[short_code] is not None:
# Has an associated waypoint id!
return name_to_id[short_code]
else:
print("The waypoint name %s is used for multiple different unique waypoints. Please use" + \
"the waypoint id." % (short_code))
return None
# Also not an waypoint annotation name, so we will operate under the assumption that it is a
# unique waypoint id.
return short_code

ret = short_code
for waypoint in graph.waypoints:
if short_code == id_to_short_code(waypoint.id):
if ret != short_code:
return short_code # Multiple waypoints with same short code.
ret = waypoint.id
return ret


def update_waypoints_and_edges(graph, localization_id):
"""Update and print waypoint ids and edge ids."""
name_to_id = dict()
edges = dict()

short_code_to_count = {}
waypoint_to_timestamp = []
for waypoint in graph.waypoints:
# Determine the timestamp that this waypoint was created at.
timestamp = -1.0
try:
timestamp = waypoint.annotations.creation_time.seconds + waypoint.annotations.creation_time.nanos / 1e9
except:
# Must be operating on an older graph nav map, since the creation_time is not
# available within the waypoint annotations message.
pass
waypoint_to_timestamp.append((waypoint.id,
timestamp,
waypoint.annotations.name))

# Determine how many waypoints have the same short code.
short_code = id_to_short_code(waypoint.id)
if short_code not in short_code_to_count:
short_code_to_count[short_code] = 0
short_code_to_count[short_code] += 1

# Add the annotation name/id into the current dictionary.
waypoint_name = waypoint.annotations.name
if waypoint_name:
if waypoint_name in name_to_id:
# Waypoint name is used for multiple different waypoints, so set the waypoint id
# in this dictionary to None to avoid confusion between two different waypoints.
name_to_id[waypoint_name] = None
else:
# First time we have seen this waypoint annotation name. Add it into the dictionary
# with the respective waypoint unique id.
name_to_id[waypoint_name] = waypoint.id

# Sort the set of waypoints by their creation timestamp. If the creation timestamp is unavailable,
# fallback to sorting by annotation name.
waypoint_to_timestamp = sorted(waypoint_to_timestamp, key= lambda x:(x[1], x[2]))

# Print out the waypoints name, id, and short code in a ordered sorted by the timestamp from
# when the waypoint was created.
print('%d waypoints:' % len(graph.waypoints))
for waypoint in waypoint_to_timestamp:
pretty_print_waypoints(waypoint[0], waypoint[2], short_code_to_count, localization_id)

for edge in graph.edges:
if edge.id.to_waypoint in edges:
if edge.id.from_waypoint not in edges[edge.id.to_waypoint]:
edges[edge.id.to_waypoint].append(edge.id.from_waypoint)
else:
edges[edge.id.to_waypoint] = [edge.id.from_waypoint]
print("(Edge) from waypoint id: ", edge.id.from_waypoint, " and to waypoint id: ",
edge.id.to_waypoint)

return name_to_id, edges
45 changes: 45 additions & 0 deletions spot_driver/scripts/spot_ros.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@
from spot_msgs.msg import SystemFault, SystemFaultState
from spot_msgs.msg import BatteryState, BatteryStateArray
from spot_msgs.msg import Feedback
from spot_msgs.msg import NavigateToAction, NavigateToResult, NavigateToFeedback
from spot_msgs.srv import ListGraph, ListGraphResponse

from ros_helpers import *
from spot_wrapper import SpotWrapper

import actionlib
import logging
import threading

class SpotROS():
"""Parent class for using the wrapper. Defines all callbacks and keeps the wrapper alive"""
Expand Down Expand Up @@ -282,6 +286,39 @@ def bodyPoseCallback(self, data):
euler_zxy = q.to_euler_zxy()
self.spot_wrapper.set_mobility_params(data.position.z, euler_zxy)

def handle_list_graph(self, upload_path):
"""ROS service handler for listing graph_nav waypoint_ids"""
resp = self.spot_wrapper.list_graph(upload_path)
return ListGraphResponse(resp)

def handle_navigate_to_feedback(self):
"""Thread function to send navigate_to feedback"""
while not rospy.is_shutdown() and self.run_navigate_to:
localization_state = self.spot_wrapper._graph_nav_client.get_localization_state()
if localization_state.localization.waypoint_id:
self.navigate_as.publish_feedback(NavigateToFeedback(localization_state.localization.waypoint_id))
rospy.Rate(10).sleep()

def handle_navigate_to(self, msg):
"""ROS service handler to run mission of the robot. The robot will replay a mission"""
# create thread to periodically publish feedback
feedback_thraed = threading.Thread(target = self.handle_navigate_to_feedback, args = ())
self.run_navigate_to = True
feedback_thraed.start()
# run navigate_to
resp = self.spot_wrapper.navigate_to(upload_path = msg.upload_path,
navigate_to = msg.navigate_to,
initial_localization_fiducial = msg.initial_localization_fiducial,
initial_localization_waypoint = msg.initial_localization_waypoint)
self.run_navigate_to = False
feedback_thraed.join()

# check status
if resp[0]:
self.navigate_as.set_succeeded(NavigateToResult(resp[0], resp[1]))
else:
self.navigate_as.set_aborted(NavigateToResult(resp[0], resp[1]))

def shutdown(self):
rospy.loginfo("Shutting down ROS driver for Spot")
self.spot_wrapper.sit()
Expand Down Expand Up @@ -376,6 +413,14 @@ def main(self):
rospy.Service("estop/hard", Trigger, self.handle_estop_hard)
rospy.Service("estop/gentle", Trigger, self.handle_estop_soft)

rospy.Service("list_graph", ListGraph, self.handle_list_graph)

self.navigate_as = actionlib.SimpleActionServer('navigate_to', NavigateToAction,
execute_cb = self.handle_navigate_to,
auto_start = False)
self.navigate_as.start()


rospy.on_shutdown(self.shutdown)

self.auto_claim = rospy.get_param('~auto_claim', False)
Expand Down
Loading

0 comments on commit fb553b9

Please sign in to comment.