forked from facebookresearch/pyrobot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
core.py
1240 lines (1068 loc) · 41.8 KB
/
core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import print_function
import copy
import importlib
import os
import sys
import threading
import time
from abc import ABCMeta, abstractmethod
import numpy as np
import rospy
import tf
from geometry_msgs.msg import Twist, Pose, PoseStamped
from sensor_msgs.msg import JointState, CameraInfo, Image
import pyrobot.utils.util as prutil
from pyrobot.utils.move_group_interface import MoveGroupInterface as MoveGroup
from pyrobot_bridge.srv import *
from pyrobot.utils.util import try_cv2_import
cv2 = try_cv2_import()
from cv_bridge import CvBridge, CvBridgeError
import message_filters
import actionlib
from pyrobot_bridge.msg import (
MoveitAction,
MoveitGoal,
)
from actionlib_msgs.msg import GoalStatus
class Robot:
"""
This is the main interface class that is composed of
key robot modules (base, arm, gripper, and camera).
This class builds robot specific objects by reading a
configuration and instantiating the necessary robot module objects.
"""
def __init__(
self,
robot_name,
use_arm=True,
use_base=True,
use_camera=True,
use_gripper=True,
arm_config={},
base_config={},
camera_config={},
gripper_config={},
common_config={},
):
"""
Constructor for the Robot class
:param robot_name: robot name
:param use_arm: use arm or not
:param use_base: use base or not
:param use_camera: use camera or not
:param use_gripper: use gripper or not
:param arm_config: configurations for arm
:param base_config: configurations for base
:param camera_config: configurations for camera
:param gripper_config: configurations for gripper
:type robot_name: string
:type use_arm: bool
:type use_base: bool
:type use_camera: bool
:type use_gripper: bool
:type arm_config: dict
:type base_config: dict
:type camera_config: dict
:type gripper_config: dict
"""
root_path = os.path.dirname(os.path.realpath(__file__))
cfg_path = os.path.join(root_path, "cfg")
robot_pool = []
for f in os.listdir(cfg_path):
if f.endswith("_config.py"):
robot_pool.append(f[: -len("_config.py")])
root_node = "pyrobot."
self.configs = None
this_robot = None
for srobot in robot_pool:
if srobot in robot_name:
this_robot = srobot
mod = importlib.import_module(
root_node + "cfg." + "{:s}_config".format(srobot)
)
cfg_func = getattr(mod, "get_cfg")
if srobot == "locobot" and "lite" in robot_name:
self.configs = cfg_func("create")
else:
self.configs = cfg_func()
if self.configs is None:
raise ValueError(
"Invalid robot name provided, only the following"
" are currently available: {}".format(robot_pool)
)
self.configs.freeze()
try:
rospy.init_node("pyrobot", anonymous=True)
except rospy.exceptions.ROSException:
rospy.logwarn("ROS node [pyrobot] has already been initialized")
root_node += this_robot
root_node += "."
if self.configs.HAS_COMMON:
mod = importlib.import_module(root_node + self.configs.COMMON.NAME)
common_class = getattr(mod, self.configs.COMMON.CLASS)
setattr(
self,
self.configs.COMMON.NAME,
common_class(self.configs, **common_config),
)
if self.configs.HAS_ARM and use_arm:
mod = importlib.import_module(root_node + "arm")
arm_class = getattr(mod, self.configs.ARM.CLASS)
if self.configs.HAS_COMMON:
arm_config[self.configs.COMMON.NAME] = getattr(
self, self.configs.COMMON.NAME
)
self.arm = arm_class(self.configs, **arm_config)
if self.configs.HAS_BASE and use_base:
mod = importlib.import_module(root_node + "base")
base_class = getattr(mod, self.configs.BASE.CLASS)
if self.configs.HAS_COMMON:
base_config[self.configs.COMMON.NAME] = getattr(
self, self.configs.COMMON.NAME
)
self.base = base_class(self.configs, **base_config)
if self.configs.HAS_CAMERA and use_camera:
mod = importlib.import_module(root_node + "camera")
camera_class = getattr(mod, self.configs.CAMERA.CLASS)
if self.configs.HAS_COMMON:
camera_config[self.configs.COMMON.NAME] = getattr(
self, self.configs.COMMON.NAME
)
self.camera = camera_class(self.configs, **camera_config)
if self.configs.HAS_GRIPPER and use_gripper and use_arm:
mod = importlib.import_module(root_node + "gripper")
gripper_class = getattr(mod, self.configs.GRIPPER.CLASS)
if self.configs.HAS_COMMON:
gripper_config[self.configs.COMMON.NAME] = getattr(
self, self.configs.COMMON.NAME
)
self.gripper = gripper_class(self.configs, **gripper_config)
# sleep some time for tf listeners in subclasses
rospy.sleep(2)
class Base(object):
"""
This is a parent class on which the robot
specific Base classes would be built.
"""
def __init__(self, configs):
"""
The consturctor for Base class.
:param configs: configurations for base
:type configs: YACS CfgNode
"""
self.configs = configs
self.ctrl_pub = rospy.Publisher(
configs.BASE.ROSTOPIC_BASE_COMMAND, Twist, queue_size=1
)
def stop(self):
"""
Stop the base
"""
msg = Twist()
msg.linear.x = 0
msg.angular.z = 0
self.ctrl_pub.publish(msg)
def set_vel(self, fwd_speed, turn_speed, exe_time=1):
"""
Set the moving velocity of the base
:param fwd_speed: forward speed
:param turn_speed: turning speed
:param exe_time: execution time
"""
fwd_speed = min(fwd_speed, self.configs.BASE.MAX_ABS_FWD_SPEED)
fwd_speed = max(fwd_speed, -self.configs.BASE.MAX_ABS_FWD_SPEED)
turn_speed = min(turn_speed, self.configs.BASE.MAX_ABS_TURN_SPEED)
turn_speed = max(turn_speed, -self.configs.BASE.MAX_ABS_TURN_SPEED)
msg = Twist()
msg.linear.x = fwd_speed
msg.angular.z = turn_speed
start_time = rospy.get_time()
self.ctrl_pub.publish(msg)
while rospy.get_time() - start_time < exe_time:
self.ctrl_pub.publish(msg)
rospy.sleep(1.0 / self.configs.BASE.BASE_CONTROL_RATE)
def go_to_relative(self, xyt_position, use_map, close_loop, smooth):
"""
Moves the robot to the robot to given
goal state relative to its initial pose.
:param xyt_position: The relative goal state of the form (x,y,t)
:param use_map: When set to "True", ensures that controler is
using only free space on the map to move the robot.
:param close_loop: When set to "True", ensures that controler is
operating in open loop by
taking account of odometry.
:param smooth: When set to "True", ensures that the motion
leading to the goal is a smooth one.
:type xyt_position: list
:type use_map: bool
:type close_loop: bool
:type smooth: bool
:return: True if successful; False otherwise (timeout, etc.)
:rtype: bool
"""
raise NotImplementedError
def go_to_absolute(self, xyt_position, use_map, close_loop, smooth):
"""
Moves the robot to the robot to given goal state in the world frame.
:param xyt_position: The goal state of the form (x,y,t)
in the world (map) frame.
:param use_map: When set to "True", ensures that controler is using
only free space on the map to move the robot.
:param close_loop: When set to "True", ensures that controler is
operating in open loop by
taking account of odometry.
:param smooth: When set to "True", ensures that the motion
leading to the goal is a smooth one.
:type xyt_position: list
:type use_map: bool
:type close_loop: bool
:type smooth: bool
:return: True if successful; False otherwise (timeout, etc.)
:rtype: bool
"""
raise NotImplementedError
def track_trajectory(self, states, controls, close_loop):
"""
State trajectory that the robot should track.
:param states: sequence of (x,y,t) states that the robot should track.
:param controls: optionally specify control sequence as well.
:param close_loop: whether to close loop on the
computed control sequence or not.
:type states: list
:type controls: list
:type close_loop: bool
:return: True if successful; False otherwise (timeout, etc.)
:rtype: bool
"""
raise NotImplementedError
def get_state(self, state_type):
"""
Returns the requested base pose in the (x,y, yaw) format.
:param state_type: Requested state type. Ex: Odom, SLAM, etc
:type state_type: string
:return: pose: pose of the form [x, y, yaw]
:rtype: list
"""
raise NotImplementedError
class Gripper(object):
"""
This is a parent class on which the robot
specific Gripper classes would be built.
"""
__metaclass__ = ABCMeta
def __init__(self, configs):
"""
Constructor for Gripper parent class.
:param configs: configurations for gripper
:type configs: YACS CfgNode
"""
self.configs = configs
@abstractmethod
def open(self, **kwargs):
pass
@abstractmethod
def close(self, **kwargs):
pass
class Camera(object):
"""
This is a parent class on which the robot
specific Camera classes would be built.
"""
__metaclass__ = ABCMeta
def __init__(self, configs):
"""
Constructor for Camera parent class.
:param configs: configurations for camera
:type configs: YACS CfgNode
"""
self.configs = configs
self.cv_bridge = CvBridge()
self.camera_info_lock = threading.RLock()
self.camera_img_lock = threading.RLock()
self.rgb_img = None
self.depth_img = None
self.camera_info = None
self.camera_P = None
rospy.Subscriber(
self.configs.CAMERA.ROSTOPIC_CAMERA_INFO_STREAM,
CameraInfo,
self._camera_info_callback,
)
rgb_topic = self.configs.CAMERA.ROSTOPIC_CAMERA_RGB_STREAM
self.rgb_sub = message_filters.Subscriber(rgb_topic, Image)
depth_topic = self.configs.CAMERA.ROSTOPIC_CAMERA_DEPTH_STREAM
self.depth_sub = message_filters.Subscriber(depth_topic, Image)
img_subs = [self.rgb_sub, self.depth_sub]
self.sync = message_filters.ApproximateTimeSynchronizer(
img_subs, queue_size=10, slop=0.2
)
self.sync.registerCallback(self._sync_callback)
def _sync_callback(self, rgb, depth):
self.camera_img_lock.acquire()
try:
self.rgb_img = self.cv_bridge.imgmsg_to_cv2(rgb, "bgr8")
self.rgb_img = self.rgb_img[:, :, ::-1]
self.depth_img = self.cv_bridge.imgmsg_to_cv2(depth, "passthrough")
except CvBridgeError as e:
rospy.logerr(e)
self.camera_img_lock.release()
def _camera_info_callback(self, msg):
self.camera_info_lock.acquire()
self.camera_info = msg
self.camera_P = np.array(msg.P).reshape((3, 4))
self.camera_info_lock.release()
def get_rgb(self):
"""
This function returns the RGB image perceived by the camera.
:rtype: np.ndarray or None
"""
self.camera_img_lock.acquire()
rgb = copy.deepcopy(self.rgb_img)
self.camera_img_lock.release()
return rgb
def get_depth(self):
"""
This function returns the depth image perceived by the camera.
The depth image is in meters.
:rtype: np.ndarray or None
"""
self.camera_img_lock.acquire()
depth = copy.deepcopy(self.depth_img)
self.camera_img_lock.release()
depth = depth / self.configs.CAMERA.DEPTH_MAP_FACTOR
return depth
def get_rgb_depth(self):
"""
This function returns both the RGB and depth
images perceived by the camera.
The depth image is in meters.
:rtype: np.ndarray or None
"""
self.camera_img_lock.acquire()
rgb = copy.deepcopy(self.rgb_img)
depth = copy.deepcopy(self.depth_img)
depth = depth / self.configs.CAMERA.DEPTH_MAP_FACTOR
self.camera_img_lock.release()
return rgb, depth
def get_intrinsics(self):
"""
This function returns the camera intrinsics.
:rtype: np.ndarray
"""
if self.camera_P is None:
return self.camera_P
self.camera_info_lock.acquire()
P = copy.deepcopy(self.camera_P)
self.camera_info_lock.release()
return P[:3, :3]
class Arm(object):
"""
This is a parent class on which the robot
specific Arm classes would be built.
"""
__metaclass__ = ABCMeta
def __init__(
self,
configs,
moveit_planner="ESTkConfigDefault",
planning_time=30,
analytical_ik=None,
use_moveit=True,
):
"""
Constructor for Arm parent class.
:param configs: configurations for arm
:param moveit_planner: moveit planner type
:param analytical_ik: customized analytical ik class
if you have one, None otherwise
:param use_moveit: use moveit or not, default is True
:type configs: YACS CfgNode
:type moveit_planner: string
:type analytical_ik: None or an Analytical ik class
:type use_moveit: bool
"""
self.configs = configs
self.moveit_planner = moveit_planner
self.planning_time = planning_time
self.use_moveit = use_moveit
self.joint_state_lock = threading.RLock()
self.tf_listener = tf.TransformListener()
if self.use_moveit:
self._init_moveit()
if analytical_ik is not None:
self.ana_ik_solver = analytical_ik(
configs.ARM.ARM_BASE_FRAME, configs.ARM.EE_FRAME
)
self.arm_joint_names = self.configs.ARM.JOINT_NAMES
self.arm_dof = len(self.arm_joint_names)
# Subscribers
self._joint_angles = dict()
self._joint_velocities = dict()
self._joint_efforts = dict()
rospy.Subscriber(
configs.ARM.ROSTOPIC_JOINT_STATES, JointState, self._callback_joint_states
)
# Ros-Params
rospy.set_param("pyrobot/base_link", configs.ARM.ARM_BASE_FRAME)
rospy.set_param("pyrobot/gripper_link", configs.ARM.EE_FRAME)
rospy.set_param(
"pyrobot/robot_description", configs.ARM.ARM_ROBOT_DSP_PARAM_NAME
)
# Publishers
self.joint_pub = None
self._setup_joint_pub()
# Services
self._ik_service = rospy.ServiceProxy("pyrobot/ik", IkCommand)
try:
self._ik_service.wait_for_service(timeout=3)
except:
rospy.logerr("Timeout waiting for Inverse Kinematics Service!!")
self._fk_service = rospy.ServiceProxy("pyrobot/fk", FkCommand)
try:
self._fk_service.wait_for_service(timeout=3)
except:
rospy.logerr("Timeout waiting for Forward Kinematics Service!!")
@abstractmethod
def go_home(self):
"""
Reset robot to default home position
"""
pass
@property
def pose_ee(self):
"""
Return the end effector pose w.r.t 'ARM_BASE_FRAME'
:return:
trans: translational vector (shape: :math:`[3, 1]`)
rot_mat: rotational matrix (shape: :math:`[3, 3]`)
quat: rotational matrix in the form
of quaternion (shape: :math:`[4,]`)
:rtype: tuple (trans, rot_mat, quat)
"""
return self.get_ee_pose(base_frame=self.configs.ARM.ARM_BASE_FRAME)
def get_ee_pose(self, base_frame):
"""
Return the end effector pose with respect to the base_frame
:param base_frame: reference frame
:type base_frame: string
:return:
tuple (trans, rot_mat, quat)
trans: translational vector (shape: :math:`[3, 1]`)
rot_mat: rotational matrix (shape: :math:`[3, 3]`)
quat: rotational matrix in the form
of quaternion (shape: :math:`[4,]`)
:rtype: tuple or ROS PoseStamped
"""
return self.get_transform(base_frame, self.configs.ARM.EE_FRAME)
def get_transform(self, src_frame, dest_frame):
"""
Return the transform from the src_frame to dest_frame
:param src_frame: source frame
:param dest_frame: destination frame
:type src_frame: string
:type dest_frame: basestring
:return:
tuple (trans, rot_mat, quat )
trans: translational vector (shape: :math:`[3, 1]`)
rot_mat: rotational matrix (shape: :math:`[3, 3]`)
quat: rotational matrix in the form
of quaternion (shape: :math:`[4,]`)
:rtype: tuple or ROS PoseStamped
"""
trans, quat = prutil.get_tf_transform(self.tf_listener, src_frame, dest_frame)
rot_mat = prutil.quat_to_rot_mat(quat)
trans = np.array(trans).reshape(-1, 1)
rot_mat = np.array(rot_mat)
quat = np.array(quat)
return trans, rot_mat, quat
def get_joint_angles(self):
"""
Return the joint angles
:return: joint_angles
:rtype: np.ndarray
"""
self.joint_state_lock.acquire()
joint_angles = []
for joint in self.arm_joint_names:
joint_angles.append(self.get_joint_angle(joint))
joint_angles = np.array(joint_angles).flatten()
self.joint_state_lock.release()
return joint_angles
def get_joint_velocities(self):
"""
Return the joint velocities
:return: joint_vels
:rtype: np.ndarray
"""
self.joint_state_lock.acquire()
joint_vels = []
for joint in self.arm_joint_names:
joint_vels.append(self.get_joint_velocity(joint))
joint_vels = np.array(joint_vels).flatten()
self.joint_state_lock.release()
return joint_vels
def get_joint_torques(self):
"""
Return the joint torques
:return: joint_torques
:rtype: np.ndarray
"""
self.joint_state_lock.acquire()
joint_torques = []
for joint in self.arm_joint_names:
try:
joint_torques.append(self.get_joint_torque(joint))
except (ValueError, IndexError):
rospy.loginfo("Torque value for joint " "[%s] not available!" % joint)
joint_torques = np.array(joint_torques).flatten()
self.joint_state_lock.release()
return joint_torques
def get_joint_angle(self, joint):
"""
Return the joint angle of the 'joint'
:param joint: joint name
:type joint: string
:return: joint angle
:rtype: float
"""
if joint not in self.arm_joint_names:
raise ValueError("%s not in arm joint list!" % joint)
if joint not in self._joint_angles.keys():
raise ValueError("Joint angle for joint $s not available!" % joint)
return self._joint_angles[joint]
def get_joint_velocity(self, joint):
"""
Return the joint velocity of the 'joint'
:param joint: joint name
:type joint: string
:return: joint velocity
:rtype: float
"""
if joint not in self.arm_joint_names:
raise ValueError("%s not in arm joint list!" % joint)
if joint not in self._joint_velocities.keys():
raise ValueError("Joint velocity for joint" " $s not available!" % joint)
return self._joint_velocities[joint]
def get_joint_torque(self, joint):
"""
Return the joint torque of the 'joint'
:param joint: joint name
:type joint: string
:return: joint torque
:rtype: float
"""
if joint not in self.arm_joint_names:
raise ValueError("%s not in arm joint list!" % joint)
if joint not in self._joint_efforts.keys():
raise ValueError("Joint torque for joint $s" " not available!" % joint)
return self._joint_efforts[joint]
def set_joint_positions(self, positions, plan=True, wait=True, **kwargs):
"""
Sets the desired joint angles for all arm joints
:param positions: list of length #of joints, angles in radians
:param plan: whether to use moveit to plan a path. Without planning,
there is no collision checking and each joint will
move to its target joint position directly.
:param wait: if True, it will wait until the desired
joint positions are achieved
:type positions: list
:type plan: bool
:type wait: bool
:return: True if successful; False otherwise (timeout, etc.)
:rtype: bool
"""
result = False
if isinstance(positions, np.ndarray):
positions = positions.flatten().tolist()
if plan:
if not self.use_moveit:
raise ValueError(
"Moveit is not initialized, " "did you pass in use_moveit=True?"
)
self._cancel_moveit_goal()
if isinstance(positions, np.ndarray):
positions = positions.tolist()
rospy.loginfo("Moveit Motion Planning...")
moveit_goal = MoveitGoal()
moveit_goal.wait = wait
moveit_goal.action_type = "set_joint_positions"
moveit_goal.values = positions
self._moveit_client.send_goal(moveit_goal)
if wait:
self._moveit_client.wait_for_result()
status = self._moveit_client.get_state()
if status == GoalStatus.SUCCEEDED:
result = True
else:
self._pub_joint_positions(positions)
if wait:
result = self._loop_angle_pub_cmd(self._pub_joint_positions, positions)
if wait:
return result
def make_plan_joint_positions(self, positions, **kwargs):
result = None
if isinstance(positions, np.ndarray):
positions = positions.flatten().tolist()
if not self.use_moveit:
raise ValueError(
"Moveit is not initialized, " "did you pass in use_moveit=True?"
)
rospy.loginfo("Moveit Motion Planning...")
raise NotImplementedError
# result = self.moveit_group.motionPlanToJointPosition(
# self.arm_joint_names, positions
# )
# return [p.positions for p in result]
def set_joint_velocities(self, velocities, **kwargs):
"""
Sets the desired joint velocities for all arm joints
:param velocities: target joint velocities
:type velocities: list
"""
self._pub_joint_velocities(velocities)
def set_joint_torques(self, torques, **kwargs):
"""
Sets the desired joint torques for all arm joints
:param torques: target joint torques
:type torques: list
"""
self._pub_joint_torques(torques)
def set_ee_pose(
self, position, orientation, plan=True, wait=True, numerical=True, **kwargs
):
"""
Commands robot arm to desired end-effector pose
(w.r.t. 'ARM_BASE_FRAME').
Computes IK solution in joint space and calls set_joint_positions.
Will wait for command to complete if wait is set to True.
:param position: position of the end effector (shape: :math:`[3,]`)
:param orientation: orientation of the end effector
(can be rotation matrix, euler angles (yaw,
pitch, roll), or quaternion)
(shape: :math:`[3, 3]`, :math:`[3,]`
or :math:`[4,]`)
The convention of the Euler angles here
is z-y'-x' (intrinsic rotations),
which is one type of Tait-Bryan angles.
:param plan: use moveit the plan a path to move to the desired pose
:param wait: wait until the desired pose is achieved
:param numerical: use numerical inverse kinematics solver or
analytical inverse kinematics solver
:type position: np.ndarray
:type orientation: np.ndarray
:type plan: bool
:type wait: bool
:type numerical: bool
:return: Returns True if command succeeded, False otherwise
:rtype: bool
"""
joint_positions = self.compute_ik(position, orientation, numerical=numerical)
result = False
if joint_positions is None:
rospy.logerr("No IK solution found; check if target_pose is valid")
else:
result = self.set_joint_positions(joint_positions, plan=plan, wait=wait)
return result
def make_plan_pose(
self, position, orientation, wait=True, numerical=True, **kwargs
):
if not self.use_moveit:
raise ValueError(
"Using plan=True when moveit is not"
" initialized, did you pass "
"in use_moveit=True?"
)
pose = Pose()
position = position.flatten()
if orientation.size == 4:
orientation = orientation.flatten()
ori_x = orientation[0]
ori_y = orientation[1]
ori_z = orientation[2]
ori_w = orientation[3]
elif orientation.size == 3:
quat = prutil.euler_to_quat(orientation)
ori_x = quat[0]
ori_y = quat[1]
ori_z = quat[2]
ori_w = quat[3]
elif orientation.size == 9:
quat = prutil.rot_mat_to_quat(orientation)
ori_x = quat[0]
ori_y = quat[1]
ori_z = quat[2]
ori_w = quat[3]
else:
raise TypeError(
"Orientation must be in one "
"of the following forms:"
"rotation matrix, euler angles, or quaternion"
)
pose.position.x, pose.position.y, pose.position.z = (
position[0],
position[1],
position[2],
)
(
pose.orientation.x,
pose.orientation.y,
pose.orientation.z,
pose.orientation.w,
) = (ori_x, ori_y, ori_z, ori_w)
raise NotImplementedError
# result = self.moveit_group.motionPlanToPose(pose, wait=True)
# return [p.positions for p in result]
def move_ee_xyz(
self,
displacement,
eef_step=0.005,
numerical=True,
plan=True,
wait=True,
**kwargs
):
"""
Keep the current orientation fixed, move the end
effector in {xyz} directions
:param displacement: (delta_x, delta_y, delta_z)
:param eef_step: resolution (m) of the interpolation
on the cartesian path
:param numerical: use numerical inverse kinematics solver or
analytical inverse kinematics solver
:param plan: use moveit the plan a path to move to the
desired pose. If False,
it will do linear interpolation along the path,
and simply use IK solver to find the
sequence of desired joint positions and
then call `set_joint_positions`
:type displacement: np.ndarray
:type eef_step: float
:type numerical: bool
:type plan: bool
:return: whether the movement is successful or not
:rtype: bool
"""
displacement = displacement.reshape(-1, 1)
path_len = np.linalg.norm(displacement)
num_pts = int(np.ceil(path_len / float(eef_step)))
if num_pts <= 1:
num_pts = 2
if (hasattr(self, "ana_ik_solver") and not numerical) or not plan:
if not wait:
raise NotImplementedError(
"Not wait is supported only in plan=True case."
)
ee_pose = self.get_ee_pose(self.configs.ARM.ARM_BASE_FRAME)
cur_pos, cur_ori, cur_quat = ee_pose
waypoints_sp = np.linspace(0, path_len, num_pts)
waypoints = cur_pos + waypoints_sp / float(path_len) * displacement
way_joint_positions = []
qinit = self.get_joint_angles().tolist()
for i in range(waypoints.shape[1]):
joint_positions = self.compute_ik(
waypoints[:, i].flatten(),
cur_quat,
qinit=qinit,
numerical=numerical,
)
if joint_positions is None:
rospy.logerr(
"No IK solution found; " "check if target_pose is valid"
)
return False
way_joint_positions.append(copy.deepcopy(joint_positions))
qinit = copy.deepcopy(joint_positions)
success = False
for joint_positions in way_joint_positions:
success = self.set_joint_positions(
joint_positions, plan=plan, wait=True
)
return success
else:
if not self.use_moveit:
raise ValueError(
"Using plan=True when moveit is not"
" initialized, did you pass "
"in use_moveit=True?"
)
self._cancel_moveit_goal()
ee_pose = self.get_ee_pose(self.configs.ARM.ARM_BASE_FRAME)
cur_pos, cur_ori, cur_quat = ee_pose
cur_pos = np.array(cur_pos).reshape(-1, 1)
cur_quat = np.array(cur_quat)
waypoints_sp = np.linspace(0, path_len, num_pts)
waypoints = cur_pos + waypoints_sp / path_len * displacement
moveit_waypoints = []
wpose = Pose()
for i in range(waypoints.shape[1]):
if i < 1:
continue
wpose.position.x = waypoints[0, i]
wpose.position.y = waypoints[1, i]
wpose.position.z = waypoints[2, i]
wpose.orientation.x = cur_quat[0]
wpose.orientation.y = cur_quat[1]
wpose.orientation.z = cur_quat[2]
wpose.orientation.w = cur_quat[3]
moveit_waypoints.append(copy.deepcopy(wpose))
moveit_goal = MoveitGoal()
moveit_goal.wait = wait
moveit_goal.action_type = "move_ee_xyz"
moveit_goal.waypoints = moveit_waypoints
moveit_goal.eef_step = eef_step
self._moveit_client.send_goal(moveit_goal)
if wait:
self._moveit_client.wait_for_result()
status = self._moveit_client.get_state()
if status != GoalStatus.SUCCEEDED:
return False
ee_pose = self.get_ee_pose(self.configs.ARM.ARM_BASE_FRAME)
cur_pos, cur_ori, cur_quat = ee_pose
cur_pos = np.array(cur_pos).reshape(-1, 1)
success = True
diff = cur_pos.flatten() - waypoints[:, -1].flatten()
error = np.linalg.norm(diff)
if error > self.configs.ARM.MAX_EE_ERROR:
rospy.logerr("Move end effector along xyz failed!")
success = False
return success
def _cancel_moveit_goal(self):
if not self._moveit_client.gh:
return
# 2 is for "Done" state of action
if self._moveit_client.simple_state != 2:
self._moveit_client.cancel_all_goals()
def compute_fk_position(self, joint_positions, des_frame):
"""
Given joint angles, compute the pose of desired_frame with respect
to the base frame (self.configs.ARM.ARM_BASE_FRAME). The desired frame
must be in self.arm_link_names
:param joint_positions: joint angles
:param des_frame: desired frame
:type joint_positions: np.ndarray
:type des_frame: string
:return: translational vector and rotational matrix
:rtype: np.ndarray, np.ndarray
"""
joint_positions = joint_positions.flatten()