add 600 aikit(tmp)

This commit is contained in:
2929ss 2022-07-28 18:49:11 +08:00
parent f8d2b07193
commit 6074c2221a
3 changed files with 745 additions and 1 deletions

View file

@ -0,0 +1,18 @@
<launch>
<!-- Load URDF, rviz, etc. on the parameter server,加载参数服务器上的URDF、rviz等 -->
<arg name="model" default="$(find mycobot_description)/urdf/600_urdf/mycobot_600_urdf.urdf"/>
<arg name="rvizconfig" default="$(find mycobot_600)/config/mycobot_600.rviz" />
<!-- <arg name="gui" default="true" /> -->
<param name="robot_description" command="$(find xacro)/xacro --inorder $(arg model)" />
<!-- Combinejoin values to TF -->
<!-- <node name="robot_state_publisher" pkg="robot_state_publisher" type="robot_state_publisher" /> -->
<!-- <node name="joint_state_publisher_gui" pkg="joint_state_publisher_gui" type="joint_state_publisher_gui" /> -->
<node name="robot_state_publisher" pkg="robot_state_publisher" type="state_publisher" >
<rosparam param="source_list" subst_value="true">["joint_states"]</rosparam>
</node>
<!-- Show in Rviz -->
<node name="rviz" pkg="rviz" type="rviz" args="-d $(arg rvizconfig)" required="true" />
</launch>

View file

@ -275,7 +275,7 @@ def listener():
global mc
rospy.init_node("control_slider", anonymous=True)
ip = rospy.get_param("~ip", "192.168.10.159")
ip = rospy.get_param("~ip", "192.168.10.158")
print (ip)
mc = ElephantRobot(ip, 5001)
# START CLIENT,启动客户端

View file

@ -0,0 +1,726 @@
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
from socket import *
import math
from multiprocessing import Lock
from operator import imod
from tokenize import Pointfloat
import cv2
import numpy as np
import time
import json
import os,sys
import rospy
from visualization_msgs.msg import Marker
import rospy
from sensor_msgs.msg import JointState
# from moving_utils import Movement
IS_CV_4 = cv2.__version__[0] == '4'
__version__ = "1.0"
global mc
mutex = Lock()
class ElephantRobot(object):
def __init__(self, host, port):
# setup connection
# 建立连接
self.BUFFSIZE = 2048
self.ADDR = (host, port)
self.tcp_client = socket(AF_INET, SOCK_STREAM)
def start_client(self):
try:
self.tcp_client.connect(self.ADDR)
return ""
except error,e:
return e
def stop_client(self):
self.tcp_client.close()
def send_command(self, command):
with mutex:
self.tcp_client.send(command.encode())
recv_data = self.tcp_client.recv(self.BUFFSIZE).decode()
res_str = str(recv_data)
print("recv = " )+ res_str
res_arr = res_str.split(":")
if len(res_arr) == 2:
return res_arr[1]
else:
return ""
def string_to_coords(self, data):
data = data.replace("[", "")
data = data.replace("]", "")
data_arr = data.split(",")
if len(data_arr) == 6:
try:
coords_1 = float(data_arr[0])
coords_2 = float(data_arr[1])
coords_3 = float(data_arr[2])
coords_4 = float(data_arr[3])
coords_5 = float(data_arr[4])
coords_6 = float(data_arr[5])
coords = [coords_1, coords_2, coords_3, coords_4, coords_5, coords_6]
return coords
except:
return invalid_coords()
return invalid_coords()
def string_to_double(self, data):
try:
val = float(data)
return val
except:
return -9999.99
def string_to_int(self, data):
try:
val = int(data)
return val
except:
return -9999
def invalid_coords(self):
coords = [-1, -2, -3, -4, -1, -1]
return coords
def get_angles(self):
command = "get_angles()\n"
res = self.send_command(command)
return self.string_to_coords(res)
def get_coords(self):
command = "get_coords()\n"
res = self.send_command(command)
return self.string_to_coords(res)
def get_speed(self):
command = "get_speed()\n"
res = self.send_command(command)
return self.string_to_double(res)
def power_on(self):
command = "power_on()\n"
res = self.send_command(command)
return True
def power_off(self):
command = "power_off()\n"
res = self.send_command(command)
return True
def check_running(self):
command = "check_running()\n"
res = self.send_command(command)
return res == "1"
def state_check(self):
command = "state_check()\n"
res = self.send_command(command)
return res == "1"
def program_open(self, file_path):
command = "program_open(" + file_path + ")\n"
res = self.send_command(command)
return self.string_to_int(res)
def program_run(self, start_line):
"""run program运行程序"""
command = "program_run(" + str(start_line) + ")\n"
res = self.send_command(command)
return self.string_to_int(res)
def read_next_error(self):
command = "read_next_error()\n"
res = self.send_command(command)
return res
def write_coords(self, coords, speed):
"""set coords,设置坐标"""
command = "set_coords("
for item in coords:
command += str(item) + ","
command += str(speed) + ")\n"
self.send_command(command)
def write_coord(self, axis, value, speed):
coords = self.get_coords()
if coords != self.invalid_coords():
coords[axis] = value
self.write_coords(coords, speed)
def write_angles(self, angles, speed):
"""set angles,设置角度"""
command = "set_angles("
for item in angles:
command += str(item) + ","
command += str(speed) + ")\n"
self.send_command(command)
def write_angle(self, joint, value, speed):
angles = self.get_angles()
if angles != self.invalid_coords():
angles[joint] = value
self.write_angles(angles, speed)
def set_speed(self, percentage):
command = "set_speed(" + str(percentage) + ")\n"
self.send_command(command)
def set_carte_torque_limit(self, axis_str, value):
command = "set_torque_limit(" + axis_str + "," + str(value) + ")\n"
self.send_command(command)
def set_upside_down(self, up_down):
up = "1"
if up_down:
up = "0"
command = "set_upside_down(" + up + ")\n"
self.send_command(command)
def set_payload(self, payload):
command = "set_speed(" + str(payload) + ")\n"
self.send_command(command)
def state_on(self):
command = "state_on()\n"
self.send_command(command)
def state_off(self):
command = "state_off()\n"
self.send_command(command)
def task_stop(self):
command = "task_stop()\n"
self.send_command(command)
def jog_angle(self, joint_str, direction, speed):
command = (
"jog_angle(" + joint_str + "," + str(direction) + "," + str(speed) + ")\n"
)
self.send_command(command)
def jog_coord(self, axis_str, direction, speed):
command = (
"jog_coord(" + axis_str + "," + str(direction) + "," + str(speed) + ")\n"
)
self.send_command(command)
def get_digital_in(self, pin_number):
command = "get_digital_in(" + str(pin_number) + ")\n"
self.send_command(command)
def get_digital_out(self, pin_number):
command = "get_digital_out(" + str(pin_number) + ")\n"
self.send_command(command)
def set_digital_out(self, pin_number, pin_signal):
command = "set_digital_out(" + str(pin_number) + "," + str(pin_signal) + ")\n"
self.send_command(command)
def set_analog_out(self, pin_number, pin_signal):
command = "set_analog_out(" + str(pin_number) + "," + str(pin_signal) + ")\n"
self.send_command(command)
def get_acceleration(self):
command = "get_acceleration()\n"
res = self.send_command(command)
return self.string_to_int(res)
def set_acceleration(self, acceleration):
command = "set_acceleration(" + str(acceleration) + ")\n"
self.send_command(command)
def command_wait_done(self):
command = "wait_command_done()\n"
self.send_command(command)
def wait(self, seconds):
command = "wait(" + str(seconds) + ")\n"
self.send_command(command)
def assign_variable(self, var_name, var_value):
command = 'assign_variable("' + str(var_name) + '",' + str(var_value) + ")\n"
self.send_command(command)
def get_variable(self, var_name):
command = 'get_variable("' + str(var_name) + '")\n'
return self.send_command(command)
class Object_detect(object):
def __init__(self, camera_x = 140, camera_y = 5): # m5
# def __init__(self, camera_x = 140, camera_y = -5): # pi
# inherit the parent class
super(Object_detect, self).__init__()
# get path of file
dir_path = os.path.dirname(__file__)
# declare 600
self.mc = None
# 移动角度
self.move_angles = [
[0, 0, 0, 0, 90, 0], # point to grab
[-33.31, 2.02, -10.72, -0.08, 95, -54.84], # init the point
]
# 移动坐标
self.move_coords = [
[92.3, -104.9, 211.4, -179.6, 28.91, 131.29], # above the red bucket
[165.0, -93.6, 201.4, -173.43, 46.23, 160.65], # above the green bucket
[88.1, 126.3, 193.4, 162.15, 2.23, 156.02], # above the blue bucket
[-5.4, 120.6, 204.6, 162.66, -6.96, 159.93], # above the gray bucket
]
# which robot: USB* is m5; ACM* is wio; AMA* is raspi
self.robot_m5 = os.popen("ls /dev/ttyUSB*").readline()[:-1]
self.robot_wio = os.popen("ls /dev/ttyACM*").readline()[:-1]
self.robot_raspi = os.popen("ls /dev/ttyAMA*").readline()[:-1]
self.robot_jes = os.popen("ls /dev/ttyTHS1").readline()[:-1]
self.raspi = False
if "dev" in self.robot_m5:
self.Pin = [2, 5]
# self.Pin = [5]
elif "dev" in self.robot_wio:
self.Pin = [20, 21]
for i in self.move_coords:
i[2] -= 20
elif "dev" in self.robot_raspi or "dev" in self.robot_jes:
import RPi.GPIO as GPIO
GPIO.setwarnings(False)
self.GPIO = GPIO
GPIO.setmode(GPIO.BCM)
GPIO.setup(20, GPIO.OUT)
GPIO.setup(21, GPIO.OUT)
GPIO.output(20, 1)
GPIO.output(21, 1)
self.raspi = True
if self.raspi:
self.gpio_status(False)
# choose place to set cube
self.color = 0
# parameters to calculate camera clipping parameters
self.x1 = self.x2 = self.y1 = self.y2 = 0
# set cache of real coord
self.cache_x = self.cache_y = 0
# set color HSV
self.HSV = {
"yellow": [np.array([11, 115, 70]), np.array([40, 255, 245])],
"red": [np.array([0, 43, 46]), np.array([8, 255, 255])],
"green": [np.array([35, 43, 46]), np.array([77, 255, 255])],
"blue": [np.array([100, 43, 46]), np.array([124, 255, 255])],
"cyan": [np.array([78, 43, 46]), np.array([99, 255, 255])],
}
# use to calculate coord between cube and 600
self.sum_x1 = self.sum_x2 = self.sum_y2 = self.sum_y1 = 0
# The coordinates of the grab center point relative to the 600
self.camera_x, self.camera_y = camera_x, camera_y
# The coordinates of the cube relative to the 600
self.c_x, self.c_y = 0, 0
# The ratio of pixels to actual values
self.ratio = 0
# Get ArUco marker dict that can be detected.
self.aruco_dict = cv2.aruco.Dictionary_get(cv2.aruco.DICT_6X6_250)
# Get ArUco marker params.
self.aruco_params = cv2.aruco.DetectorParameters_create()
# init a node and a publisher
rospy.init_node("marker", anonymous=True)
self.pub = rospy.Publisher('/cube', Marker, queue_size=1)
# init a Marker
self.marker = Marker()
self.marker.header.frame_id = "/joint1"
self.marker.ns = "cube"
self.marker.type = self.marker.CUBE
self.marker.action = self.marker.ADD
self.marker.scale.x = 0.04
self.marker.scale.y = 0.04
self.marker.scale.z = 0.04
self.marker.color.a = 1.0
self.marker.color.g = 1.0
self.marker.color.r = 1.0
# marker position initial
self.marker.pose.position.x = 0
self.marker.pose.position.y = 0
self.marker.pose.position.z = 0.03
self.marker.pose.orientation.x = 0
self.marker.pose.orientation.y = 0
self.marker.pose.orientation.z = 0
self.marker.pose.orientation.w = 1.0
# publish marker
def pub_marker(self, x, y, z=0.03):
self.marker.header.stamp = rospy.Time.now()
self.marker.pose.position.x = x
self.marker.pose.position.y = y
self.marker.pose.position.z = z
self.marker.color.g = self.color
self.pub.publish(self.marker)
# pump_control pi
def gpio_status(self, flag):
if flag:
self.GPIO.output(20, 0)
self.GPIO.output(21, 0)
else:
self.GPIO.output(20, 1)
self.GPIO.output(21, 1)
# 开启吸泵 m5
def pump_on(self):
# 让2号位工作
self.mc.set_basic_output(2, 0)
# 让5号位工作
self.mc.set_basic_output(5, 0)
# 停止吸泵 m5
def pump_off(self):
# 让2号位停止工作
self.mc.set_basic_output(2, 1)
# 让5号位停止工作
self.mc.set_basic_output(5, 1)
# Grasping motion
def move(self, x, y, color):
# send Angle to move 600
print(color)
self.mc.send_angles(self.move_angles[0], 30)
time.sleep(4)
# send coordinates to move 600
self.mc.send_coords([x, y, 140, 179.12, -0.18, 179.46], 30, 0)
time.sleep(3)
self.pub_marker(x/1000.0, y/1000.0, 140/1000.0)
self.mc.send_coords([x, y, 95, 179.12, -0.18, 179.46], 30, 0) # -178.77, -2.69, 40.15
# self.mc.send_coords([x, y, 90, 179.12, -0.18, 179.46], 30, 0) # -178.77, -2.69, 40.15
time.sleep(3)
self.pub_marker(x/1000.0, y/1000.0, 90/1000.0)
# open pump
if "dev" in self.robot_m5:
self.pump_on()
elif "dev" in self.robot_raspi or "dev" in self.robot_jes:
self.gpio_status(True)
time.sleep(1.5)
tmp = []
while True:
if not tmp:
tmp = self.mc.get_angles()
else:
break
time.sleep(0.5)
# print(tmp)
self.mc.send_angles([tmp[0], 17.22, -32.51, tmp[3], 97, tmp[5]],30)
time.sleep(3)
self.mc.send_coords(self.move_coords[color], 30, 1)
self.pub_marker(self.move_coords[color][0]/1000.0,
self.move_coords[color][1]/1000.0,
self.move_coords[color][2]/1000.0)
time.sleep(3)
# close pump
if "dev" in self.robot_m5:
self.pump_off()
elif "dev" in self.robot_raspi or "dev" in self.robot_jes:
self.gpio_status(False)
time.sleep(6)
if color == 1:
self.pub_marker(
self.move_coords[color][0]/1000.0+0.04, self.move_coords[color][1]/1000.0-0.02)
elif color == 0:
self.pub_marker(
self.move_coords[color][0]/1000.0+0.03, self.move_coords[color][1]/1000.0)
self.mc.send_angles(self.move_angles[1], 30)
time.sleep(1.5)
# decide whether grab cube
def decide_move(self, x, y, color):
print(x, y, self.cache_x, self.cache_y)
# detect the cube status move or run
if (abs(x - self.cache_x) + abs(y - self.cache_y)) / 2 > 5: # mm
self.cache_x, self.cache_y = x, y
return
else:
self.cache_x = self.cache_y = 0
# 调整吸泵吸取位置y增大,向左移动;y减小,向右移动;x增大,前方移动;x减小,向后方移动
self.move(x, y, color)
# init 600
def run(self):
if "dev" in self.robot_m5:
self.mc = MyCobot(self.robot_m5, 115200)
elif "dev" in self.robot_raspi:
self.mc = MyCobot(self.robot_raspi, 1000000)
if not self.raspi:
self.pub_pump(False, self.Pin)
self.mc.send_angles([-33.31, 2.02, -10.72, -0.08, 95, -54.84], 30)
time.sleep(3)
# draw aruco
def draw_marker(self, img, x, y):
# draw rectangle on img
cv2.rectangle(
img,
(x - 20, y - 20),
(x + 20, y + 20),
(0, 255, 0),
thickness=2,
lineType=cv2.FONT_HERSHEY_COMPLEX,
)
# add text on rectangle
cv2.putText(img, "({},{})".format(x, y), (x, y),
cv2.FONT_HERSHEY_COMPLEX_SMALL, 1, (243, 0, 0), 2,)
# get points of two aruco
def get_calculate_params(self, img):
# Convert the image to a gray image
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Detect ArUco marker.
corners, ids, rejectImaPoint = cv2.aruco.detectMarkers(
gray, self.aruco_dict, parameters=self.aruco_params
)
"""
Two Arucos must be present in the picture and in the same order.
There are two Arucos in the Corners, and each aruco contains the pixels of its four corners.
Determine the center of the aruco by the four corners of the aruco.
"""
if len(corners) > 0:
if ids is not None:
if len(corners) <= 1 or ids[0] == 1:
return None
x1 = x2 = y1 = y2 = 0
point_11, point_21, point_31, point_41 = corners[0][0]
x1, y1 = int((point_11[0] + point_21[0] + point_31[0] + point_41[0]) / 4.0), int(
(point_11[1] + point_21[1] + point_31[1] + point_41[1]) / 4.0)
point_1, point_2, point_3, point_4 = corners[1][0]
x2, y2 = int((point_1[0] + point_2[0] + point_3[0] + point_4[0]) / 4.0), int(
(point_1[1] + point_2[1] + point_3[1] + point_4[1]) / 4.0)
return x1, x2, y1, y2
return None
# set camera clipping parameters
def set_cut_params(self, x1, y1, x2, y2):
self.x1 = int(x1)
self.y1 = int(y1)
self.x2 = int(x2)
self.y2 = int(y2)
print(self.x1, self.y1, self.x2, self.y2)
# set parameters to calculate the coords between cube and 600
def set_params(self, c_x, c_y, ratio):
self.c_x = c_x
self.c_y = c_y
self.ratio = 220.0/ratio
# calculate the coords between cube and 600
def get_position(self, x, y):
return ((y - self.c_y)*self.ratio + self.camera_x), ((x - self.c_x)*self.ratio + self.camera_y)
"""
Calibrate the camera according to the calibration parameters.
Enlarge the video pixel by 1.5 times, which means enlarge the video size by 1.5 times.
If two ARuco values have been calculated, clip the video.
"""
def transform_frame(self, frame):
# enlarge the image by 1.5 times
fx = 1.5
fy = 1.5
frame = cv2.resize(frame, (0, 0), fx=fx, fy=fy,
interpolation=cv2.INTER_CUBIC)
if self.x1 != self.x2:
# the cutting ratio here is adjusted according to the actual situation
frame = frame[int(self.y2*0.2):int(self.y1*1.15),
int(self.x1*0.7):int(self.x2*1.15)]
return frame
# detect cube color
def color_detect(self, img):
# set the arrangement of color'HSV
x = y = 0
for mycolor, item in self.HSV.items():
# print("mycolor:",mycolor)
redLower = np.array(item[0])
redUpper = np.array(item[1])
# transfrom the img to model of gray
hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
# print("hsv",hsv)
# wipe off all color expect color in range
mask = cv2.inRange(hsv, item[0], item[1])
# a etching operation on a picture to remove edge roughness
erosion = cv2.erode(mask, np.ones((1, 1), np.uint8), iterations=2)
# the image for expansion operation, its role is to deepen the color depth in the picture
dilation = cv2.dilate(erosion, np.ones(
(1, 1), np.uint8), iterations=2)
# adds pixels to the image
target = cv2.bitwise_and(img, img, mask=dilation)
# the filtered image is transformed into a binary image and placed in binary
ret, binary = cv2.threshold(dilation, 127, 255, cv2.THRESH_BINARY)
# get the contour coordinates of the image, where contours is the coordinate value, here only the contour is detected
contours, hierarchy = cv2.findContours(
dilation, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if len(contours) > 0:
# do something about misidentification
boxes = [
box
for box in [cv2.boundingRect(c) for c in contours]
if min(img.shape[0], img.shape[1]) / 10
< min(box[2], box[3])
< min(img.shape[0], img.shape[1]) / 1
]
if boxes:
for box in boxes:
x, y, w, h = box
# find the largest object that fits the requirements
c = max(contours, key=cv2.contourArea)
# get the lower left and upper right points of the positioning object
x, y, w, h = cv2.boundingRect(c)
# locate the target by drawing rectangle
cv2.rectangle(img, (x, y), (x+w, y+h), (153, 153, 0), 2)
# calculate the rectangle center
x, y = (x*2+w)/2, (y*2+h)/2
# calculate the real coordinates of 600 relative to the target
if mycolor == "red":
self.color = 0
elif mycolor == "green":
self.color = 1
elif mycolor == "cyan":
self.color = 2
else:
self.color = 3
if abs(x) + abs(y) > 0:
return x, y
else:
return None
if __name__ == "__main__":
# open the camera
cap_num = 0
cap = cv2.VideoCapture(cap_num, cv2.CAP_V4L)
if not cap.isOpened():
cap.open()
# init a class of Object_detect
detect = Object_detect()
# init 600
detect.run()
_init_ = 20
init_num = 0
nparams = 0
num = 0
real_sx = real_sy = 0
while cv2.waitKey(1) < 0:
# read camera
_, frame = cap.read()
# deal img
frame = detect.transform_frame(frame)
if _init_ > 0:
_init_ -= 1
continue
# calculate the parameters of camera clipping
if init_num < 20:
if detect.get_calculate_params(frame) is None:
cv2.imshow("figure", frame)
continue
else:
x1, x2, y1, y2 = detect.get_calculate_params(frame)
detect.draw_marker(frame, x1, y1)
detect.draw_marker(frame, x2, y2)
detect.sum_x1 += x1
detect.sum_x2 += x2
detect.sum_y1 += y1
detect.sum_y2 += y2
init_num += 1
continue
elif init_num == 20:
detect.set_cut_params(
(detect.sum_x1)/20.0,
(detect.sum_y1)/20.0,
(detect.sum_x2)/20.0,
(detect.sum_y2)/20.0,
)
detect.sum_x1 = detect.sum_x2 = detect.sum_y1 = detect.sum_y2 = 0
init_num += 1
continue
# calculate params of the coords between cube and 600
if nparams < 10:
if detect.get_calculate_params(frame) is None:
cv2.imshow("figure", frame)
continue
else:
x1, x2, y1, y2 = detect.get_calculate_params(frame)
detect.draw_marker(frame, x1, y1)
detect.draw_marker(frame, x2, y2)
detect.sum_x1 += x1
detect.sum_x2 += x2
detect.sum_y1 += y1
detect.sum_y2 += y2
nparams += 1
continue
elif nparams == 10:
nparams += 1
# calculate and set params of calculating real coord between cube and 600
detect.set_params(
(detect.sum_x1+detect.sum_x2)/20.0,
(detect.sum_y1+detect.sum_y2)/20.0,
abs(detect.sum_x1-detect.sum_x2)/10.0 +
abs(detect.sum_y1-detect.sum_y2)/10.0
)
print("ok")
continue
# get detect result
detect_result = detect.color_detect(frame)
if detect_result is None:
cv2.imshow("figure", frame)
continue
else:
x, y = detect_result
# calculate real coord between cube and 600
real_x, real_y = detect.get_position(x, y)
if num == 20:
detect.pub_marker(real_sx/20.0/1000.0, real_sy/20.0/1000.0)
detect.decide_move(real_sx/20.0, real_sy/20.0, detect.color)
num = real_sx = real_sy = 0
else:
num += 1
real_sy += real_y
real_sx += real_x
cv2.imshow("figure", frame)
# close the window
if cv2.waitKey(1) & 0xFF == ord('q'):
cap.release()
cv2.destroyAllWindows()
sys.exit()