Skip to content

Commit

Permalink
1.Modularize python package.
Browse files Browse the repository at this point in the history
2.Create soccer module.
  • Loading branch information
lab-pc1 committed Apr 3, 2019
1 parent b4fb689 commit 61c611e
Show file tree
Hide file tree
Showing 15 changed files with 518 additions and 416 deletions.
416 changes: 0 additions & 416 deletions op3.py

This file was deleted.

Empty file added python-OP3/__init__.py
Empty file.
48 changes: 48 additions & 0 deletions python-OP3/op3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
from py_module import *
import rospy

# For pydev debugger
# import sys
# reload(sys)
# sys.setdefaultencoding('utf-8')


class Op3(object):
"""
Client ROS class for manipulating Robotis-OP3 in real-world and Gazebo
"""
# imported method
go_init_pose = op3_action.go_init_pose
play_motion = op3_action.play_motion

set_module = robotis_controller.set_module
torque_off = robotis_controller.torque_off
torque_on = robotis_controller.torque_on

set_angles = op3_direct_control.set_angles
online_walking_command = op3_walking.online_walking_command
google_tts = op3_utility.google_tts
safety_suspend = useful_method.safety_suspend

def __init__(self, ns="/robotis"):
rospy.init_node("op3_tester")
# exported method
self.ns = ns
op3_action.constructor(self)
op3_walking.constructor(self)
op3_direct_control.constructor(self)
op3_utility.constructor(self)
open_cr.constructor(self)
robotis_controller.constructor(self)
useful_method.constructor(self)

rospy.sleep(0.5)
self.go_init_pose()


if "__main__" == __name__:
op3_tester = Op3()
# op3_tester.ready_for_walking()
# op3_tester.safety_suspend()
pass
8 changes: 8 additions & 0 deletions python-OP3/py_module/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
__all__ = ["op3_walking", "op3_action", "op3_direct_control",
"op3_utility", "open_cr", "robotis_controller", "useful_method"]
# from .op3_walking import *
# from .open_cr import *
# from .op3_utility import *
# from .op3_direct_control import *
# from .op3_action import *
# from .robotis_controller import *
77 changes: 77 additions & 0 deletions python-OP3/py_module/op3_action.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# -*- coding: utf-8 -*-
from op3_action_module_msgs.srv import IsRunning
import rospy
from std_msgs.msg import String, Int32


def constructor(self):
ns = self.ns
self.is_action_done = False

self._pub_ini_pose = rospy.Publisher(ns + "/base/ini_pose", String, queue_size=0)
self._pub_action = rospy.Publisher(ns + "/action/page_num", Int32, queue_size=0)

rospy.wait_for_service(ns + '/action/is_running')
self.action_is_running_srv = rospy.ServiceProxy(ns + '/action/is_running', IsRunning)


def play_motion(self, action, start_voice, end_voice=None, is_blocking=True):
"""
Available page_num:
-2 : BREAK
-1 : STOP
# 0 : .
4 : Thank you
41: Introduction
24: Wow
23: Yes go
15: Sit down
1: Stand up
54: Clap please
27: Oops
38: Bye bye
111 : Intro01
115 : Intro02
118 : Intro03
# soccer
80 : Init pose
122 : Get up(Front)
123 : Get up(Back)
121 : Right Kick
120 : Left Kick
60 : Keeper Ready
61 : Defence to Right
62 : Defence to Left
# action demo
200 : Foot play
202 : Roll back
204 : Look
126 : Push up
"""
if action == "ini_pose":
self._pub_ini_pose.publish(action)
self.present_module = action
else:
self.set_module("action_module")
self._pub_action.publish(action)

self.google_tts(start_voice)

if is_blocking:
self.is_action_done = False
while not self.is_action_done:
rospy.sleep(0.1)
if end_voice:
self.google_tts(end_voice)


def go_init_pose(self, start_voice="進入賢者模式。"):
if self.present_module != "ini_pose":
self.torque_on()
rospy.sleep(0.1)
self.play_motion("ini_pose", start_voice=start_voice)


def stand_up(self):
self.play_motion(1, start_voice="我站起來50公分。")
69 changes: 69 additions & 0 deletions python-OP3/py_module/op3_direct_control.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# -*- coding: utf-8 -*-
import rospy
import numpy as np
from sensor_msgs.msg import JointState
import time


def constructor(self):
ns = self.ns

self._pub_joints = rospy.Publisher(self.ns + "/direct_control/set_joint_states", JointState, queue_size=0)


def set_default_moving_time(self, param):
rospy.set_param(self.ns + "/direct_control/default_moving_time", param)


def set_default_moving_angle(self, param):
rospy.set_param(self.ns + "/direct_control/default_moving_angle", param)


def set_check_collision(self, param):
rospy.set_param(self.ns + "/direct_control/check_collision", param)


def head_control(self, pan, tilt):
self.set_angles({"head_pan": np.pi * (pan / 180.0),
"head_tilt": np.pi * (tilt / 180.0)})


def set_angles(self, angles):
# self.set_module("direct_control_module")

msg = JointState()
msg.name = angles.keys()
msg.position = angles.values()
self._pub_joints.publish(msg)


def wave_angle(self, test_joints, angles=None, duration=2, print_angle=False):
if angles is None:
angles = np.random.uniform(1.57, -1.57, 3)
angle_dict = dict(zip(test_joints, angles))
self.set_angles(angle_dict)
if print_angle:
rospy.loginfo(np.round(angle_dict.values(), 2))
rospy.sleep(duration=duration)


def set_angles_slow(self, stop_angles, delay=2):
start_angles = self.get_angles()
start = time.time()
stop = start + delay
r = rospy.Rate(100)
while not rospy.is_shutdown():
t = time.time()
if t > stop: break
ratio = (t - start) / delay
angles = interpolate(stop_angles, start_angles, ratio)
self.set_angles(angles)
r.sleep()


def interpolate(anglesa, anglesb, coefa):
z = {}
joints = anglesa.keys()
for j in joints:
z[j] = anglesa[j] * coefa + anglesb[j] * (1 - coefa)
return z
19 changes: 19 additions & 0 deletions python-OP3/py_module/op3_utility.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
import rospy
import requests
from std_msgs.msg import String
import os


def constructor(self):
# Utility
self._pub_sound = rospy.Publisher("/play_sound_file", String, queue_size=0)


def google_tts(self, query="我是TTSAPI", lang="zh"):
doc = requests.get(
"https://translate.google.com/translate_tts?ie=UTF-8&q=" + query + "&tl=" + lang + "&client=tw-ob")
with open('buff.mp3', 'wb') as f:
f.write(doc.content)
fpath = f.name
self._pub_sound.publish(os.path.realpath(fpath))
105 changes: 105 additions & 0 deletions python-OP3/py_module/op3_walking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# -*- coding: utf-8 -*-
import rospy
from op3_walking_module_msgs.srv import GetWalkingParam, SetWalkingParam
from op3_walking_module_msgs.msg import WalkingParam
from op3_online_walking_module_msgs.msg import FootStepCommand
from std_msgs.msg import Float64, String, Bool
from rosgraph_msgs.msg import Log
import re


def constructor(self):
ns = self.ns

self.is_balance_set = False
self.is_walking_done = False
self.walking_query = re.compile("(?<=Walking Control \()(\d+)/(\d+)")

# ros logging subscriber for walking control
self._sub_rosinfo = rospy.Subscriber("/rosout", Log, _cb_rosinfo, self, queue_size=10)

# Walking
self._pub_walking = rospy.Publisher(ns + "/walking/command", String, queue_size=0)
self._pub_walking_params = rospy.Publisher(ns + "/walking/set_params", WalkingParam, queue_size=0)
self._pub_head_scan = rospy.Publisher(ns + "/head_control/scan_command", String, queue_size=0)

# Online Walking
self._sub_movement_done = rospy.Subscriber(ns + "/movement_done", String, _cb_movement_done, self, queue_size=10)

self._pub_online_walking = rospy.Publisher(ns + "/online_walking/foot_step_command", FootStepCommand,
queue_size=0)
self._pub_foot_distance = rospy.Publisher(ns + "/online_walking/foot_distance", Float64, queue_size=0)
self._pub_wholebody_balance = rospy.Publisher(ns + "/online_walking/wholebody_balance_msg", String,
queue_size=0)
self._pub_reset_body = rospy.Publisher(ns + "/online_walking/reset_body", Bool, queue_size=0)


def _cb_movement_done(msg, self):
print "movement_done: " + msg.data
if msg.data == "xxx":
self.is_walking_done = True


def _cb_rosinfo(msg, self):
# print msg
if msg.name == "/op3_manager":
print "rosinfo: " + msg.msg
walking_state = self.walking_query.search(msg.msg)
if walking_state:
print "present_step: " + walking_state.group(1), "total_steps: " + walking_state.group(2)
if walking_state.group(1) == walking_state.group(2):
self.is_walking_done = True
elif msg.msg == "[END] Balance Gain":
self.is_balance_set = True
elif msg.msg == "[END] Joint Control":
self.present_module = "op3_online_walking_module"
elif msg.msg == "Walking Enable":
self.present_module = "op3_walking_module"


def set_balance(self, sw):
self.is_balance_set = False
self._pub_wholebody_balance.publish("balance_" + sw)
while not self.is_balance_set:
rospy.sleep(0.1)


def ready_for_walking(self):
self.set_module("online_walking_module")
self._pub_reset_body.publish(True)
# Getting ready pose spending 5 secs.
rospy.sleep(5)
self.set_balance("on")
self._pub_foot_distance.publish(0.09)
rospy.sleep(0.5)


def online_walking_command(self, direction="forward", step_num=4,
step_time=0.50, step_length=0.04, side_length=0.03,
step_angle=0.1, start_leg=None, is_blocking=True):
"""
Available direction: turn_left, turn_right, forward,
backward, stop, left, right
"""
if self.present_module != "online_walking_module":
self.ready_for_walking()
msg = FootStepCommand()
msg.command = direction
msg.step_time = step_time
msg.step_num = step_num
if direction == "forward" or direction == "backward":
start_leg = "right_leg"
msg.step_length = step_length
elif direction == "left" or direction == "right":
msg.side_length = side_length
elif direction == "turn_left" or direction == "turn_right":
msg.step_angle = step_angle
else:
print "停止步行"
msg.start_leg = start_leg
self._pub_online_walking.publish(msg)
if is_blocking:
self.is_walking_done = False
while not self.is_walking_done:
rospy.sleep(0.1)

20 changes: 20 additions & 0 deletions python-OP3/py_module/open_cr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
from sensor_msgs.msg import Imu
from std_msgs.msg import String
import rospy


def constructor(self):
ns = self.ns
self._sub_imu = rospy.Subscriber(ns + "/open_cr/imu", Imu, _cb_imu, self, queue_size=10)
self._sub_button = rospy.Subscriber(ns + "/open_cr/button", String, _cb_button, self, queue_size=10)


def _cb_imu(msg, self):
# print msg
pass


def _cb_button(msg, self):
# print msg
pass
Loading

0 comments on commit 61c611e

Please sign in to comment.