add aikit_320_pi for gripper scripts

This commit is contained in:
weijian 2023-06-16 18:10:12 +08:00
parent 823c77808c
commit 7ee9859e87
3 changed files with 1396 additions and 0 deletions

View file

@ -0,0 +1,491 @@
#!/usr/bin/env python3
import cv2
import numpy as np
import time
import os,sys
import rospy
from visualization_msgs.msg import Marker
from moving_utils import Movement
from pymycobot.mycobot import MyCobot
IS_CV_4 = cv2.__version__[0] == '4'
__version__ = "1.0"
# Adaptive seeed
class Object_detect(Movement):
def __init__(self, camera_x = 265, camera_y = 5):
# inherit the parent class
super(Object_detect, self).__init__()
# declare mycobot320
self.mc = None
# 移动角度
self.move_angles = [
[0.61, 45.87, -92.37, -41.3, 89.56, 9.58], # init the point
[18.8, -7.91, -54.49, -23.02, -0.79, -14.76], # point to grab
[16.96, -6.85, -54.93, -19.68, 89.47, 12.83],
]
# 移动坐标
self.move_coords = [
[30.3, -214.9, 302.3, -169.77, -8.64, -91.55], # D Sorting area
[240.3, -202.2, 317.1, -152.12, -10.15, -95.73], # C Sorting area
[244.5, 193.2, 330.3, -160.54, 17.35, -74.59], # A Sorting area
[33.2, 205.3, 322.5, -170.22, -13.93, 92.28], # B Sorting area
]
# which robot: USB* is m5; ACM* is wio; AMA* is raspi
self.robot_m5 = os.popen("ls /dev/ttyUSB*").readline()[:-1]
self.robot_wio = os.popen("ls /dev/ttyACM*").readline()[:-1]
self.robot_raspi = os.popen("ls /dev/ttyAMA*").readline()[:-1]
self.robot_jes = os.popen("ls /dev/ttyTHS1").readline()[:-1]
self.raspi = False
# choose place to set cube 选择放置立方体的地方
self.color = 0
# parameters to calculate camera clipping parameters 计算相机裁剪参数的参数
self.x1 = self.x2 = self.y1 = self.y2 = 0
# set cache of real coord 设置真实坐标的缓存
self.cache_x = self.cache_y = 0
# set color HSV
self.HSV = {
"yellow": [np.array([11, 85, 70]), np.array([59, 255, 245])],
# "yellow": [np.array([22, 93, 0]), np.array([45, 255, 245])],
"red": [np.array([0, 43, 46]), np.array([8, 255, 255])],
"green": [np.array([35, 43, 35]), np.array([90, 255, 255])],
"blue": [np.array([78, 43, 46]), np.array([110, 255, 255])],
"cyan": [np.array([78, 43, 46]), np.array([99, 255, 255])],
}
# use to calculate coord between cube and mycobot320
# 用于计算立方体和 mycobot 之间的坐标
self.sum_x1 = self.sum_x2 = self.sum_y2 = self.sum_y1 = 0
# The coordinates of the grab center point relative to the mycobot320
# 抓取中心点相对于 mycobot 的坐标
self.camera_x, self.camera_y = camera_x, camera_y
# The coordinates of the cube relative to the mycobot320
# 立方体相对于 mycobot 的坐标
self.c_x, self.c_y = 0, 0
# The ratio of pixels to actual values
# 像素与实际值的比值
self.ratio = 0
# Get ArUco marker dict that can be detected.
# 获取可以检测到的 ArUco 标记字典。
self.aruco_dict = cv2.aruco.Dictionary_get(cv2.aruco.DICT_6X6_250)
# Get ArUco marker params. 获取 ArUco 标记参数
self.aruco_params = cv2.aruco.DetectorParameters_create()
# init a node and a publisher
rospy.init_node("marker", anonymous=True)
self.pub = rospy.Publisher('/cube', Marker, queue_size=1)
# init a Marker
self.marker = Marker()
self.marker.header.frame_id = "base"
self.marker.ns = "cube"
self.marker.type = self.marker.CUBE
self.marker.action = self.marker.ADD
self.marker.scale.x = 0.04
self.marker.scale.y = 0.04
self.marker.scale.z = 0.04
self.marker.color.a = 1.0
self.marker.color.g = 1.0
self.marker.color.r = 1.0
# marker position initial
self.marker.pose.position.x = 0
self.marker.pose.position.y = 0
self.marker.pose.position.z = 0.03
self.marker.pose.orientation.x = 0
self.marker.pose.orientation.y = 0
self.marker.pose.orientation.z = 0
self.marker.pose.orientation.w = 1.0
# publish marker
def pub_marker(self, x, y, z=0.03):
self.marker.header.stamp = rospy.Time.now()
self.marker.pose.position.x = x
self.marker.pose.position.y = y
self.marker.pose.position.z = z
self.marker.color.g = self.color
self.pub.publish(self.marker)
# pump_control pi
def gpio_status(self, flag):
if flag:
"""start the suction pump"""
self.mc.set_basic_output(1, 0)
self.mc.set_basic_output(2, 1)
else:
"""stop suction pump"""
self.mc.set_basic_output(1, 1)
self.mc.set_basic_output(2, 0)
time.sleep(1)
self.mc.set_basic_output(2, 1)
def gripper_on(self):
"""start gripper"""
self.mc.set_gripper_state(0, 100)
time.sleep(1.5)
def gripper_off(self):
"""stop gripper"""
self.mc.set_gripper_state(1, 100)
time.sleep(1.5)
# Grasping motion
def move(self, x, y, color):
# send Angle to move mycobot320
print(color)
print('x,y:', round(x, 2), round(y, 2))
self.mc.send_angles(self.move_angles[2], 50)
time.sleep(3)
# open gripper
self.gripper_on()
# send coordinates to move mycobot
self.mc.send_coords([x, y, 250, -174.51, 0.86, -85.93], 100, 1)
time.sleep(2.5)
self.mc.send_coords([x, y, 203, -174.51, 0.86, -85.93], 100, 1)
time.sleep(3)
# close gripper
self.gripper_off()
time.sleep(1.5)
tmp = []
while True:
if not tmp:
tmp = self.mc.get_angles()
else:
break
time.sleep(0.5)
# print(tmp)
self.mc.send_angles([tmp[0], -0.71, -54.49, -23.02, 89.56, tmp[5]],25) # [18.8, -7.91, -54.49, -23.02, -0.79, -14.76]
time.sleep(3)
self.pub_marker(self.move_coords[2][0]/1000.0, self.move_coords[2][1]/1000.0, self.move_coords[2][2]/1000.0)
self.mc.send_coords(self.move_coords[color], 100, 1)
self.pub_marker(self.move_coords[color][0]/1000.0, self.move_coords[color][1]/1000.0, self.move_coords[color][2]/1000.0)
time.sleep(6.5)
# open gripper
self.gripper_on()
time.sleep(6.5)
if color == 1:
self.pub_marker(
self.move_coords[color][0]/1000.0+0.03, self.move_coords[color][1]/1000.0-0.02
)
elif color == 0:
self.pub_marker(
self.move_coords[color][0]/1000.0+0.03, self.move_coords[color][1]/1000.0
)
# close gripper
self.gripper_off()
self.mc.send_angles(self.move_angles[0], 25)
time.sleep(4.5)
# decide whether grab cube 决定是否抓取立方体
def decide_move(self, x, y, color):
print(x, y, self.cache_x, self.cache_y)
# detect the cube status move or run 检测立方体状态移动或运行
if (abs(x - self.cache_x) + abs(y - self.cache_y)) / 2 > 5: # mm
self.cache_x, self.cache_y = x, y
return
else:
self.cache_x = self.cache_y = 0
# 调整吸泵吸取位置y增大,向左移动;y减小,向右移动;x增大,前方移动;x减小,向后方移动
self.move(x, y, color)
# init mycobot320
def run(self):
if "dev" in self.robot_raspi:
self.mc = MyCobot(self.robot_raspi, 115200)
self.mc.send_angles([0.61, 45.87, -92.37, -41.3, 89.56, 9.58], 20)
time.sleep(2.5)
self.gripper_off()
# draw aruco
def draw_marker(self, img, x, y):
# draw rectangle on img 在 img 上绘制矩形
cv2.rectangle(
img,
(x - 20, y - 20),
(x + 20, y + 20),
(0, 255, 0),
thickness=2,
lineType=cv2.FONT_HERSHEY_COMPLEX,
)
# add text on rectangle
cv2.putText(img, "({},{})".format(x, y), (x, y),
cv2.FONT_HERSHEY_COMPLEX_SMALL, 1, (243, 0, 0), 2,)
# get points of two aruco 获得两个 aruco 的点位
def get_calculate_params(self, img):
# Convert the image to a gray image 将图像转换为灰度图像
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Detect ArUco marker.
corners, ids, rejectImaPoint = cv2.aruco.detectMarkers(
gray, self.aruco_dict, parameters=self.aruco_params
)
"""
Two Arucos must be present in the picture and in the same order.
There are two Arucos in the Corners, and each aruco contains the pixels of its four corners.
Determine the center of the aruco by the four corners of the aruco.
"""
if len(corners) > 0:
if ids is not None:
if len(corners) <= 1 or ids[0] == 1:
return None
x1 = x2 = y1 = y2 = 0
point_11, point_21, point_31, point_41 = corners[0][0]
x1, y1 = int((point_11[0] + point_21[0] + point_31[0] + point_41[0]) / 4.0), int(
(point_11[1] + point_21[1] + point_31[1] + point_41[1]) / 4.0)
point_1, point_2, point_3, point_4 = corners[1][0]
x2, y2 = int((point_1[0] + point_2[0] + point_3[0] + point_4[0]) / 4.0), int(
(point_1[1] + point_2[1] + point_3[1] + point_4[1]) / 4.0)
return x1, x2, y1, y2
return None
# set camera clipping parameters 设置相机裁剪参数
def set_cut_params(self, x1, y1, x2, y2):
self.x1 = int(x1)
self.y1 = int(y1)
self.x2 = int(x2)
self.y2 = int(y2)
# set parameters to calculate the coords between cube and mycobot320
# 设置参数以计算立方体和 mycobot 之间的坐标
def set_params(self, c_x, c_y, ratio):
self.c_x = c_x
self.c_y = c_y
self.ratio = 220.0/ratio
# calculate the coords between cube and mycobot320
# 计算立方体和 mycobot 之间的坐标
def get_position(self, x, y):
return ((y - self.c_y)*self.ratio + self.camera_x), ((x - self.c_x)*self.ratio + self.camera_y)
"""
Calibrate the camera according to the calibration parameters.
Enlarge the video pixel by 1.5 times, which means enlarge the video size by 1.5 times.
If two ARuco values have been calculated, clip the video.
"""
def transform_frame(self, frame):
# enlarge the image by 1.5 times
fx = 1.5
fy = 1.5
frame = cv2.resize(frame, (0, 0), fx=fx, fy=fy,
interpolation=cv2.INTER_CUBIC)
if self.x1 != self.x2:
# the cutting ratio here is adjusted according to the actual situation
frame = frame[int(self.y2*0.78):int(self.y1*1.1),
int(self.x1*0.86):int(self.x2*1.08)]
return frame
# detect cube color
def color_detect(self, img):
# set the arrangement of color'HSV
x = y = 0
for mycolor, item in self.HSV.items():
# print("mycolor:",mycolor)
redLower = np.array(item[0])
redUpper = np.array(item[1])
# transfrom the img to model of gray 将图像转换为灰度模型
hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
# print("hsv",hsv)
# wipe off all color expect color in range 擦掉所有颜色期望范围内的颜色
mask = cv2.inRange(hsv, item[0], item[1])
# a etching operation on a picture to remove edge roughness
# 对图片进行蚀刻操作以去除边缘粗糙度
erosion = cv2.erode(mask, np.ones((1, 1), np.uint8), iterations=2)
# the image for expansion operation, its role is to deepen the color depth in the picture
# 用于扩展操作的图像,其作用是加深图片中的颜色深度
dilation = cv2.dilate(erosion, np.ones(
(1, 1), np.uint8), iterations=2)
# adds pixels to the image 向图像添加像素
target = cv2.bitwise_and(img, img, mask=dilation)
# the filtered image is transformed into a binary image and placed in binary
# 将过滤后的图像转换为二值图像并放入二值
ret, binary = cv2.threshold(dilation, 127, 255, cv2.THRESH_BINARY)
# get the contour coordinates of the image, where contours is the coordinate value, here only the contour is detected
# 获取图像的轮廓坐标其中contours为坐标值这里只检测轮廓
contours, hierarchy = cv2.findContours(
dilation, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if len(contours) > 0:
# do something about misidentification
boxes = [
box
for box in [cv2.boundingRect(c) for c in contours]
if min(img.shape[0], img.shape[1]) / 10
< min(box[2], box[3])
< min(img.shape[0], img.shape[1]) / 1
]
if boxes:
for box in boxes:
x, y, w, h = box
# find the largest object that fits the requirements 找到符合要求的最大对象
c = max(contours, key=cv2.contourArea)
# get the lower left and upper right points of the positioning object
# 获取定位对象的左下和右上点
x, y, w, h = cv2.boundingRect(c)
# locate the target by drawing rectangle 通过绘制矩形来定位目标
cv2.rectangle(img, (x, y), (x+w, y+h), (153, 153, 0), 2)
# calculate the rectangle center 计算矩形中心
x, y = (x*2+w)/2, (y*2+h)/2
# calculate the real coordinates of mycobot320 relative to the target
# 计算 mycobot 相对于目标的真实坐标
if mycolor == "yellow":
self.color = 3
break
elif mycolor == "red":
self.color = 0
break
elif mycolor == "cyan":
self.color = 2
break
elif mycolor == "blue":
self.color =2
break
elif mycolor == "green":
self.color = 1
break
# 判断是否正常识别
if abs(x) + abs(y) > 0:
return x, y
else:
return None
if __name__ == "__main__":
# open the camera
cap_num = 0
cap = cv2.VideoCapture(cap_num, cv2.CAP_V4L)
if not cap.isOpened():
cap.open()
# init a class of Object_detect
detect = Object_detect()
# init mycobot320
detect.run()
_init_ = 20
init_num = 0
nparams = 0
num = 0
real_sx = real_sy = 0
while cv2.waitKey(1) < 0:
# read camera
_, frame = cap.read()
# deal img
frame = detect.transform_frame(frame)
if _init_ > 0:
_init_ -= 1
continue
# calculate the parameters of camera clipping 计算相机裁剪的参数
if init_num < 20:
if detect.get_calculate_params(frame) is None:
cv2.imshow("figure", frame)
continue
else:
x1, x2, y1, y2 = detect.get_calculate_params(frame)
detect.draw_marker(frame, x1, y1)
detect.draw_marker(frame, x2, y2)
detect.sum_x1 += x1
detect.sum_x2 += x2
detect.sum_y1 += y1
detect.sum_y2 += y2
init_num += 1
continue
elif init_num == 20:
detect.set_cut_params(
(detect.sum_x1)/20.0,
(detect.sum_y1)/20.0,
(detect.sum_x2)/20.0,
(detect.sum_y2)/20.0,
)
detect.sum_x1 = detect.sum_x2 = detect.sum_y1 = detect.sum_y2 = 0
init_num += 1
continue
# calculate params of the coords between cube and mycobot320 计算立方体和 mycobot 之间坐标的参数
if nparams < 10:
if detect.get_calculate_params(frame) is None:
cv2.imshow("figure", frame)
continue
else:
x1, x2, y1, y2 = detect.get_calculate_params(frame)
detect.draw_marker(frame, x1, y1)
detect.draw_marker(frame, x2, y2)
detect.sum_x1 += x1
detect.sum_x2 += x2
detect.sum_y1 += y1
detect.sum_y2 += y2
nparams += 1
continue
elif nparams == 10:
nparams += 1
# calculate and set params of calculating real coord between cube and mycobot320
# 计算和设置计算立方体和mycobot之间真实坐标的参数
detect.set_params(
(detect.sum_x1+detect.sum_x2)/20.0,
(detect.sum_y1+detect.sum_y2)/20.0,
abs(detect.sum_x1-detect.sum_x2)/10.0 +
abs(detect.sum_y1-detect.sum_y2)/10.0
)
print("ok")
continue
# get detect result 获取检测结果
detect_result = detect.color_detect(frame)
if detect_result is None:
cv2.imshow("figure", frame)
continue
else:
x, y = detect_result
# calculate real coord between cube and mycobot320 计算立方体和 mycobot 之间的真实坐标
real_x, real_y = detect.get_position(x, y)
# print('real_x',round(real_x, 3),round(real_y, 3))
if num == 20:
detect.pub_marker(real_sx/20.0/1000.0, real_sy/20.0/1000.0)
detect.decide_move(real_sx/20.0, real_sy/20.0, detect.color)
num = real_sx = real_sy = 0
else:
num += 1
real_sy += real_y
real_sx += real_x
cv2.imshow("figure", frame)
# close the window
if cv2.waitKey(1) & 0xFF == ord('q'):
cap.release()
cv2.destroyAllWindows()
sys.exit()

View file

@ -0,0 +1,302 @@
#encoding: UTF-8
import platform
import cv2
import numpy as np
from pymycobot.mycobot import MyCobot
import time
import os
import rospy
from visualization_msgs.msg import Marker
from moving_utils import Movement
# y轴偏移量
pump_y = -55
# x轴偏移量
pump_x = 15
class Detect_marker(Movement):
def __init__(self):
# set cache of real coord
self.cache_x = self.cache_y = 0
# which robot: USB* is m5; ACM* is wio; AMA* is raspi
self.robot_m5 = os.popen("ls /dev/ttyUSB*").readline()[:-1]
self.robot_wio = os.popen("ls /dev/ttyACM*").readline()[:-1]
self.robot_raspi = os.popen("ls /dev/ttyAMA*").readline()[:-1]
self.robot_jes = os.popen("ls /dev/ttyTHS1").readline()[:-1]
# Creating a Camera Object
if platform.system() == "Windows":
cap_num = 1
self.cap = cv2.VideoCapture(cap_num, cv2.CAP_DSHOW)
self.cap.set(3, 640)
self.cap.set(4, 480)
elif platform.system() == "Linux":
cap_num = 0
self.cap = cv2.VideoCapture(cap_num, cv2.CAP_V4L)
self.cap.set(3, 640)
self.cap.set(4, 480)
# Determine the placement point of the QR code
self.color = 0
# Get ArUco marker dict that can be detected.
self.aruco_dict = cv2.aruco.Dictionary_get(cv2.aruco.DICT_6X6_250)
# Get ArUco marker params.
self.aruco_params = cv2.aruco.DetectorParameters_create()
# 摄像头的内参矩阵
self.camera_matrix = np.array([
[781.33379113, 0., 347.53500524],
[0., 783.79074192, 246.67627253],
[0., 0., 1.]])
# 摄像头的畸变系数
self.dist_coeffs = np.array(([[3.41360787e-01, -2.52114260e+00, -1.28012469e-03, 6.70503562e-03,
2.57018000e+00]]))
# init a node and a publisher
rospy.init_node("encode_marker", anonymous=True)
self.pub = rospy.Publisher('/cube', Marker, queue_size=1)
self.marker = Marker()
self.marker.header.frame_id = "base"
self.marker.ns = "cube"
self.marker.type = self.marker.CUBE
self.marker.action = self.marker.ADD
self.marker.scale.x = 0.04
self.marker.scale.y = 0.04
self.marker.scale.z = 0.04
self.marker.color.a = 1
self.marker.color.r = 0.3
self.marker.color.g = 0.3
self.marker.color.b = 0.3
# marker position initial
self.marker.pose.position.x = 0
self.marker.pose.position.y = 0
self.marker.pose.position.z = 0.03
self.marker.pose.orientation.x = 0
self.marker.pose.orientation.y = 0
self.marker.pose.orientation.z = 0
self.marker.pose.orientation.w = 1.0
# 控制吸泵
def pub_pump(self, flag):
if flag:
"""start the suction pump"""
self.mc.set_basic_output(1, 0)
self.mc.set_basic_output(2, 1)
else:
"""stop suction pump"""
self.mc.set_basic_output(1, 1)
self.mc.set_basic_output(2, 0)
time.sleep(1)
self.mc.set_basic_output(2, 1)
def gripper_on(self):
"""start gripper"""
self.mc.set_gripper_state(0, 100)
time.sleep(1.5)
def gripper_off(self):
"""stop gripper"""
self.mc.set_gripper_state(1, 100)
time.sleep(1.5)
# Grasping motion
def move(self, x, y, color, yaw_degrees):
print(color, yaw_degrees)
angles = [
[0.61, 45.87, -92.37, -41.3, 89.56, 9.58], # init to point
[18.8, -7.91, -54.49, -23.02, 89.56, -14.76],
[16.96, -5.27, -52.38, -17.66, 89.82, 0.0],
]
coords = [
[145.0, -65.5, 280.1, 178.99, 7.67, -179.9], # 初始化点 init point
[244.5, 193.2, 330.3, -160.54, 17.35, -74.59], # A分拣区 A sorting area
[33.2, 205.3, 322.5, -170.22, -13.93, 92.28], # B分拣区 B sorting area
[240.3, -202.2, 317.1, -152.12, -10.15, -95.73], # C分拣区 C sorting area
[30.3, -214.9, 302.3, -169.77, -8.64, -91.55], # D分拣区 D sorting area
]
if yaw_degrees > 169:
yaw_degrees = 169
elif yaw_degrees < -169:
yaw_degrees = -169
else:
yaw_degrees = yaw_degrees
yaw_degrees_opt = yaw_degrees + 5
if yaw_degrees_opt > 170:
yaw_degrees_opt = 170
elif yaw_degrees_opt < -173:
yaw_degrees_opt = -173
else:
yaw_degrees_opt = yaw_degrees_opt
print('yaw_degrees_opt:', yaw_degrees_opt)
print('real_x, real_y:', round(coords[0][0] + x, 2), round(coords[0][1] + y, 2))
# publish marker
self.marker.header.stamp = rospy.Time.now()
self.marker.pose.position.x = (coords[0][0]-x)/1000.0
self.marker.pose.position.y = (coords[0][1]-y)/1000.0
self.pub.publish(self.marker)
# send coordinates to move mycobot
self.mc.send_angles(angles[2], 50)
time.sleep(3)
self.mc.send_angle(6, yaw_degrees_opt, 80)
self.gripper_on()
time.sleep(2.5)
tmp_coords = []
while True:
if not tmp_coords:
tmp_coords = self.mc.get_coords()
else:
break
time.sleep(0.5)
self.mc.send_coords([coords[0][0] + x, coords[0][1] + y, 250, tmp_coords[3], tmp_coords[4], tmp_coords[5]], 100,
1)
time.sleep(2)
self.mc.send_coords([coords[0][0] + x, coords[0][1] + y, 203, tmp_coords[3], tmp_coords[4], tmp_coords[5]], 100,
1)
time.sleep(3)
# close gripper
if "dev" in self.robot_raspi:
self.gripper_off()
time.sleep(1.5)
tmp = []
while True:
if not tmp:
tmp = self.mc.get_angles()
else:
break
time.sleep(0.5)
# print(tmp)
self.mc.send_angles([tmp[0], -0.71, -54.49, -23.02, 89.56, tmp[5]],25) # [18.8, -7.91, -54.49, -23.02, -0.79, -14.76]
time.sleep(3)
self.mc.send_coords(coords[color], 100, 1) # coords[1] 为A分拣区coords[2] 为B分拣区, coords[3] 为C分拣区coords[4] 为D分拣区
time.sleep(6.5)
# open gripper
if "dev" in self.robot_raspi:
self.gripper_on()
time.sleep(6.5)
# publish marker
time.sleep(1)
self.marker.header.stamp = rospy.Time.now()
self.marker.pose.position.x = coords[1][0]/1000.0
self.marker.pose.position.y = coords[1][1]/1000.0
self.pub.publish(self.marker)
self.mc.send_angles(angles[0], 25)
time.sleep(4.5)
self.gripper_off()
# decide whether grab cube
def decide_move(self, x, y, color, yaw_degrees):
print(x,y)
# detect the cube status move or run
if (abs(x - self.cache_x) + abs(y - self.cache_y)) / 2 > 5: # mm
self.cache_x, self.cache_y = x, y
return
else:
self.cache_x = self.cache_y = 0
# 调整吸泵吸取位置y增大,向左移动;y减小,向右移动;x增大,前方移动;x减小,向后方移动
self.move(x + 100, y + 140, color, yaw_degrees)
# init mycobot
def init_mycobot(self):
if "dev" in self.robot_raspi:
self.mc = MyCobot(self.robot_raspi, 115200)
self.gripper_off()
self.mc.send_angles([0.61, 45.87, -92.37, -41.3, 89.56, 9.58], 20)
time.sleep(2.5)
def run(self):
global pump_y, pump_x
self.init_mycobot()
print('ok')
num = sum_x = sum_y = 0
while cv2.waitKey(1) < 0:
success, img = self.cap.read()
if not success:
print("It seems that the image cannot be acquired correctly.")
break
# transfrom the img to model of gray
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Detect ArUco marker.
corners, ids, rejectImaPoint = cv2.aruco.detectMarkers(
gray, self.aruco_dict, parameters=self.aruco_params
)
# Determine the placement point of the QR code
if ids == np.array([[1]]):
self.color = 1
elif ids == np.array([[2]]):
self.color = 2
elif ids == np.array([[3]]):
self.color = 3
elif ids == np.array([[4]]):
self.color = 4
if len(corners) > 0:
if ids is not None:
# get informations of aruco
ret = cv2.aruco.estimatePoseSingleMarkers(
corners, 0.03, self.camera_matrix, self.dist_coeffs
)
# rvec:rotation offset,tvec:translation deviator
(rvec, tvec) = (ret[0], ret[1])
(rvec - tvec).any()
xyz = tvec[0, 0, :]
# calculate the coordinates of the aruco relative to the pump
xyz = [round(xyz[0]*1000+pump_y, 2), round(xyz[1]*1000+pump_x, 2), round(xyz[2]*1000, 2)]
# 从旋转向量rvec计算旋转矩阵
rotation_matrix, _ = cv2.Rodrigues(rvec)
# 从旋转矩阵提取欧拉角yaw、pitch、roll
euler_angles = cv2.decomposeProjectionMatrix(np.hstack((rotation_matrix, tvec.reshape(3, 1))))[6]
# 提取yaw角度绕Z轴旋转角度
yaw_degrees = euler_angles[2]
# 输出ArUco码的旋转角
# print("Rotation (Yaw):", yaw_degrees)
self.yaw_degrees = round(yaw_degrees[0], 2)
# cv2.putText(img, str(xyz[:2]), (0, 64), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
for i in range(rvec.shape[0]):
# draw the aruco on img
cv2.aruco.drawDetectedMarkers(img, corners)
if num < 40 :
sum_x += xyz[1]
sum_y += xyz[0]
num += 1
elif num ==40 :
self.decide_move(sum_x/40.0, sum_y/40.0, self.color, self.yaw_degrees)
num = sum_x = sum_y = 0
cv2.imshow("encode_image", img)
if __name__ == "__main__":
detect = Detect_marker()
detect.run()

View file

@ -0,0 +1,603 @@
from multiprocessing import Process, Pipe
import cv2
import numpy as np
import time
import os,sys
import rospy
from visualization_msgs.msg import Marker
from moving_utils import Movement
from pymycobot.mycobot import MyCobot
IS_CV_4 = cv2.__version__[0] == '4'
__version__ = "1.0" # Adaptive seeed
class Object_detect(Movement):
def __init__(self, camera_x = 265, camera_y = 5):
# inherit the parent class
super(Object_detect, self).__init__()
# declare mycobot 280pi
self.mc = None
# 移动角度
self.move_angles = [
[0.61, 45.87, -92.37, -32.16, 89.56, 1.66], # init the point
[18.8, -7.91, -54.49, -23.02, 89.56, -14.76], # point to grab
[16.96, -6.85, -54.93, -19.68, 89.47, 12.83],
]
# 移动坐标
self.move_coords = [
[30.3, -214.9, 302.3, -169.77, -8.64, -91.55], # D Sorting area
[240.3, -202.2, 317.1, -152.12, -10.15, -95.73], # C Sorting area
[244.5, 193.2, 330.3, -160.54, 17.35, -74.59], # A Sorting area
[33.2, 205.3, 322.5, -170.22, -13.93, 92.28], # B Sorting area
]
# which robot: USB* is m5; ACM* is wio; AMA* is raspi
self.robot_m5 = os.popen("ls /dev/ttyUSB*").readline()[:-1]
self.robot_wio = os.popen("ls /dev/ttyACM*").readline()[:-1]
self.robot_raspi = os.popen("ls /dev/ttyAMA*").readline()[:-1]
self.robot_jes = os.popen("ls /dev/ttyTHS1").readline()[:-1]
# choose place to set cube
self.color = 0
# parameters to calculate camera clipping parameters
self.x1 = self.x2 = self.y1 = self.y2 = 0
# set cache of real coord
self.cache_x = self.cache_y = 0
# use to calculate coord between cube and mycobot
self.sum_x1 = self.sum_x2 = self.sum_y2 = self.sum_y1 = 0
# The coordinates of the grab center point relative to the mycobot
self.camera_x, self.camera_y = camera_x, camera_y
# The coordinates of the cube relative to the mycobot
self.c_x, self.c_y = 0, 0
# The ratio of pixels to actual values
self.ratio = 0
# Get ArUco marker dict that can be detected.
self.aruco_dict = cv2.aruco.Dictionary_get(cv2.aruco.DICT_6X6_250)
# Get ArUco marker params.
self.aruco_params = cv2.aruco.DetectorParameters_create()
# init a Marker
rospy.init_node("marker", anonymous=True)
self.pub = rospy.Publisher('/cube', Marker, queue_size=1)
# init a Marker
self.marker = Marker()
self.marker.header.frame_id = "base"
self.marker.ns = "cube"
self.marker.type = self.marker.CUBE
self.marker.action = self.marker.ADD
self.marker.scale.x = 0.04
self.marker.scale.y = 0.04
self.marker.scale.z = 0.04
self.marker.color.a = 1.0
self.marker.color.g = 1.0
self.marker.color.r = 1.0
# marker position initial
self.marker.pose.position.x = 0
self.marker.pose.position.y = 0
self.marker.pose.position.z = 0.03
self.marker.pose.orientation.x = 0
self.marker.pose.orientation.y = 0
self.marker.pose.orientation.z = 0
self.marker.pose.orientation.w = 1.0
self.cache_x = self.cache_y = 0
# publish marker
def pub_marker(self, x, y, z=0.03):
self.marker.header.stamp = rospy.Time.now()
self.marker.pose.position.x = x
self.marker.pose.position.y = y
self.marker.pose.position.z = z
self.marker.color.g = self.color
self.pub.publish(self.marker)
# pump_control pi
def gpio_status(self, flag):
if flag:
"""start the suction pump"""
self.mc.set_basic_output(1, 0)
self.mc.set_basic_output(2, 1)
else:
"""stop suction pump"""
self.mc.set_basic_output(1, 1)
self.mc.set_basic_output(2, 0)
time.sleep(1)
self.mc.set_basic_output(2, 1)
def gripper_on(self):
"""start gripper"""
self.mc.set_gripper_state(0, 100)
time.sleep(1.5)
def gripper_off(self):
"""stop gripper"""
self.mc.set_gripper_state(1, 100)
time.sleep(1.5)
# Grasping motion
def move(self, x, y, color):
print(color)
print('x,y:', round(x, 2), round(y, 2))
# send Angle to move mycobot320
self.mc.send_angles(self.move_angles[2], 50)
time.sleep(3)
# open gripper
self.gripper_on()
# send coordinates to move mycobot
self.mc.send_coords([x, y, 250, -174.51, 0.86, -85.93], 100, 1)
time.sleep(2.5)
self.mc.send_coords([x, y, 203, -174.51, 0.86, -85.93], 100, 1)
time.sleep(3)
# close gripper
self.gripper_off()
time.sleep(2)
tmp = []
while True:
if not tmp:
tmp = self.mc.get_angles()
else:
break
time.sleep(0.5)
# print(tmp)
self.mc.send_angles([tmp[0], -0.71, -54.49, -23.02, 89.56, tmp[5]],
25) # [18.8, -7.91, -54.49, -23.02, -0.79, -14.76]
time.sleep(3)
self.mc.send_coords(self.move_coords[color], 100, 1)
self.pub_marker(self.move_coords[color][0]/1000.0, self.move_coords[color][1]/1000.0,
self.move_coords[color][2]/1000.0)
time.sleep(6.5)
# open gripper
self.gripper_on()
time.sleep(6.5)
self.mc.send_angles(self.move_angles[0], 50)
self.gripper_off()
time.sleep(4.5)
# decide whether grab cube
def decide_move(self, x, y, color):
print(x, y, self.cache_x, self.cache_y)
# detect the cube status move or run
if (abs(x - self.cache_x) + abs(y - self.cache_y)) / 2 > 5: # mm
self.cache_x, self.cache_y = x, y
return
else:
self.cache_x = self.cache_y = 0
# 调整吸泵吸取位置y增大,向左移动;y减小,向右移动;x增大,前方移动;x减小,向后方移动
self.move(x, y, color)
# init mycobot
def run(self):
if "dev" in self.robot_raspi:
self.mc = MyCobot(self.robot_raspi, 115200)
self.mc.send_angles([0.61, 45.87, -92.37, -41.3, 89.56, 9.58], 20)
time.sleep(2.5)
self.gripper_off()
# draw aruco
def draw_marker(self, img, x, y):
# draw rectangle on img
cv2.rectangle(
img,
(x - 20, y - 20),
(x + 20, y + 20),
(0, 255, 0),
thickness=2,
lineType=cv2.FONT_HERSHEY_COMPLEX,
)
# add text on rectangle
cv2.putText(
img,
"({},{})".format(x, y),
(x, y),
cv2.FONT_HERSHEY_COMPLEX_SMALL,
1,
(243, 0, 0),
2,
)
# get points of two aruco
def get_calculate_params(self, img):
# Convert the image to a gray image
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# Detect ArUco marker.
corners, ids, rejectImaPoint = cv2.aruco.detectMarkers(
gray, self.aruco_dict, parameters=self.aruco_params)
"""
Two Arucos must be present in the picture and in the same order.
There are two Arucos in the Corners, and each aruco contains the pixels of its four corners.
Determine the center of the aruco by the four corners of the aruco.
"""
if len(corners) > 0:
if ids is not None:
if len(corners) <= 1 or ids[0] == 1:
return None
x1 = x2 = y1 = y2 = 0
point_11, point_21, point_31, point_41 = corners[0][0]
x1, y1 = int(
(point_11[0] + point_21[0] + point_31[0] + point_41[0]) /
4.0), int(
(point_11[1] + point_21[1] + point_31[1] + point_41[1])
/ 4.0)
point_1, point_2, point_3, point_4 = corners[1][0]
x2, y2 = int(
(point_1[0] + point_2[0] + point_3[0] + point_4[0]) /
4.0), int(
(point_1[1] + point_2[1] + point_3[1] + point_4[1]) /
4.0)
return x1, x2, y1, y2
return None
# set camera clipping parameters
def set_cut_params(self, x1, y1, x2, y2):
self.x1 = int(x1)
self.y1 = int(y1)
self.x2 = int(x2)
self.y2 = int(y2)
print(self.x1, self.y1, self.x2, self.y2)
# set parameters to calculate the coords between cube and mycobot
def set_params(self, c_x, c_y, ratio):
self.c_x = c_x
self.c_y = c_y
self.ratio = 220.0 / ratio
# calculate the coords between cube and mycobot
def get_position(self, x, y):
return ((y - self.c_y) * self.ratio +
self.camera_x), ((x - self.c_x) * self.ratio + self.camera_y)
"""
Calibrate the camera according to the calibration parameters.
Enlarge the video pixel by 1.5 times, which means enlarge the video size by 1.5 times.
If two ARuco values have been calculated, clip the video.
"""
def transform_frame(self, frame):
# enlarge the image by 1.5 times
fx = 1.5
fy = 1.5
frame = cv2.resize(frame, (0, 0),
fx=fx,
fy=fy,
interpolation=cv2.INTER_CUBIC)
if self.x1 != self.x2:
# the cutting ratio here is adjusted according to the actual situation
frame = frame[int(self.y2 * 0.2):int(self.y1 * 1.15),
int(self.x1 * 0.7):int(self.x2 * 1.15)]
return frame
# according the class_id to get object name
def id_class_name(self, class_id):
for key, value in self.labels.items():
if class_id == int(key):
return value
# detect object
def obj_detect(self, img, goal, kp_img, desc_img, kp_list, desc_list, connection):
i = 0
MIN_MATCH_COUNT = 5
# sift = cv2.xfeatures2d.SIFT_create()
# find the keypoints and descriptors with SIFT
# kp = []
# des = []
kp = kp_list
des = desc_list
kp2, des2 = kp_img, desc_img
# FLANN parameters
FLANN_INDEX_KDTREE = 0
index_params = dict(algorithm=FLANN_INDEX_KDTREE, trees=5)
search_params = dict(checks=50) # or pass empty dictionary
flann = cv2.FlannBasedMatcher(index_params, search_params)
x, y = 0, 0
try:
for i in range(len(des)):
matches = flann.knnMatch(des[i], des2, k=2)
# store all the good matches as per Lowe's ratio test. 根据Lowe比率测试存储所有良好匹配项。
good = []
for m, n in matches:
if m.distance < 0.7 * n.distance:
good.append(m)
# When there are enough robust matching point pairs 当有足够的健壮匹配点对至少个MIN_MATCH_COUNT
if len(good) > MIN_MATCH_COUNT:
# extract corresponding point pairs from matching 从匹配中提取出对应点对
# query index of small objects, training index of scenarios 小对象的查询索引,场景的训练索引
src_pts = np.float32([kp[i][m.queryIdx].pt
for m in good]).reshape(-1, 1, 2)
dst_pts = np.float32([kp2[m.trainIdx].pt
for m in good]).reshape(-1, 1, 2)
# Using matching points to find homography matrix in cv2.ransac 利用匹配点找到CV2.RANSAC中的单应矩阵
M, mask = cv2.findHomography(src_pts, dst_pts, cv2.RANSAC,
5.0)
matchesMask = mask.ravel().tolist()
# Calculate the distortion of image, that is the corresponding position in frame 计算图1的畸变也就是在图2中的对应的位置
h, w, d = goal[i].shape
pts = np.float32([[0, 0], [0, h - 1], [w - 1, h - 1],
[w - 1, 0]]).reshape(-1, 1, 2)
dst = cv2.perspectiveTransform(pts, M)
coord = (dst[0][0] + dst[1][0] + dst[2][0] +
dst[3][0]) / 4.0
connection.send((DRAW_COORDS, coord))
# cv2.putText(img, "{}".format(coord), (50, 60),
# fontFace=None, fontScale=1,
# color=(0, 255, 0), lineType=1)
print(format(dst[0][0][0]))
x = (dst[0][0][0] + dst[1][0][0] + dst[2][0][0] +
dst[3][0][0]) / 4.0
y = (dst[0][0][1] + dst[1][0][1] + dst[2][0][1] +
dst[3][0][1]) / 4.0
# bound box 绘制边框
# img = cv2.polylines(img, [np.int32(dst)], True, 244, 3, cv2.LINE_AA)
connection.send((DRAW_RECT, dst))
# cv2.polylines(mixture, [np.int32(dst)], True, (0, 255, 0), 2, cv2.LINE_AA)
except Exception as e:
pass
if x + y > 0:
return x, y
else:
return None
# The path to save the image folder
def parse_folder(folder):
restore = []
path = '/home/er/catkin_ws/src/mycobot_ros/mycobot_ai/aikit_320_pi/' + folder # pi
for i, j, k in os.walk(path):
for l in k:
restore.append(cv2.imread(folder + '/{}'.format(l)))
# print(restore)
return restore
def compute_keypoints_and_descriptors(sift, images_lists):
kp_list = []
desc_list = []
for images in images_lists:
kp_tmp = []
desc_tmp = []
for img in images:
kp, desc = sift.detectAndCompute(img, None)
kp_tmp.append(kp)
desc_tmp.append(desc)
kp_list.append(kp_tmp)
desc_list.append(desc_tmp)
return kp_list, desc_list
GET_FRAME = 1
STOP_PROCESSING = 2
DRAW_COORDS = 3
DRAW_RECT = 4
CLEAR_DRAW = 5
CROP_FRAME = 6
def get_frame(connection):
connection.send(GET_FRAME)
frame = connection.recv()
return frame
def process_transform_frame(frame, x1, y1, x2, y2):
# enlarge the image by 1.5 times
fx = 1.5
fy = 1.5
frame = cv2.resize(frame, (0, 0),
fx=fx,
fy=fy,
interpolation=cv2.INTER_CUBIC)
if x1 != x2:
# the cutting ratio here is adjusted according to the actual situation
frame = frame[int(y2 * 0.7):int(y1 * 1.15),
int(x1 * 0.7):int(x2 * 1.15)]
return frame
def process_display_frame(connection):
cap_num = 0
coord = None
dst = None
x1 = 0
y1 = 0
x2 = 0
y2 = 0
cap = cv2.VideoCapture(cap_num, cv2.CAP_V4L)
# cap = cv2.VideoCapture(cap_num, cv2.CAP_DSHOW)
if not cap.isOpened():
cap.open()
while cv2.waitKey(1) < 0:
_, frame = cap.read()
frame = process_transform_frame(frame, x1, y1, x2, y2)
if connection.poll():
request = connection.recv()
if request == GET_FRAME:
connection.send(frame)
elif request == CLEAR_DRAW:
coord = None
dst = None
elif type(request) is tuple:
if request[0] == DRAW_COORDS:
coord = request[1]
elif request[0] == DRAW_RECT:
dst = request[1]
elif request[0] == CROP_FRAME:
x1 = request[1]
y1 = request[2]
x2 = request[3]
y2 = request[4]
if not coord is None:
cv2.putText(frame, "{}".format(coord), (50, 60), fontFace=None,
fontScale=1, color=(0, 255, 0), lineType=1)
if not dst is None:
frame = cv2.polylines(frame, [np.int32(dst)], True, 244, 3, cv2.LINE_AA)
cv2.imshow("figure", frame)
time.sleep(0.04)
connection.send(STOP_PROCESSING)
def run():
parent_conn, child_conn = Pipe()
child = Process(target = process_display_frame, args=(child_conn,))
child.start()
res_queue = [[], [], [], []]
res_queue[0] = parse_folder('res/D')
res_queue[1] = parse_folder('res/C')
res_queue[2] = parse_folder('res/A')
res_queue[3] = parse_folder('res/B')
sift = cv2.xfeatures2d.SIFT_create()
# sift = cv2.SIFT_create()
kp_list, desc_list = compute_keypoints_and_descriptors(sift, res_queue)
# init a class of Object_detect
detect = Object_detect()
# init mycobot
detect.run()
# _init_ = 20 #
init_num = 0
nparams = 0
# num = 0
# real_sx = real_sy = 0
while True:
start_time = time.time()
if parent_conn.poll():
data = parent_conn.recv()
if data == STOP_PROCESSING:
break
# read camera
frame = get_frame(parent_conn)
# deal img
#frame = detect.transform_frame(frame)
# if _init_ > 0:
# _init_ -= 1
# continue
# calculate the parameters of camera clipping
if init_num < 20:
if detect.get_calculate_params(frame) is None:
# cv2.imshow("figure", frame)
continue
else:
x1, x2, y1, y2 = detect.get_calculate_params(frame)
detect.draw_marker(frame, x1, y1)
detect.draw_marker(frame, x2, y2)
detect.sum_x1 += x1
detect.sum_x2 += x2
detect.sum_y1 += y1
detect.sum_y2 += y2
init_num += 1
continue
elif init_num == 20:
detect.set_cut_params(
(detect.sum_x1) / 20.0,
(detect.sum_y1) / 20.0,
(detect.sum_x2) / 20.0,
(detect.sum_y2) / 20.0,
)
parent_conn.send((CROP_FRAME,
(detect.sum_x1) / 20.0,
(detect.sum_y1) / 20.0,
(detect.sum_x2) / 20.0,
(detect.sum_y2) / 20.0))
detect.sum_x1 = detect.sum_x2 = detect.sum_y1 = detect.sum_y2 = 0
init_num += 1
continue
# calculate params of the coords between cube and mycobot
if nparams < 10:
if detect.get_calculate_params(frame) is None:
# cv2.imshow("figure", frame)
continue
else:
x1, x2, y1, y2 = detect.get_calculate_params(frame)
detect.draw_marker(frame, x1, y1)
detect.draw_marker(frame, x2, y2)
detect.sum_x1 += x1
detect.sum_x2 += x2
detect.sum_y1 += y1
detect.sum_y2 += y2
nparams += 1
print ("ok")
continue
elif nparams == 10:
nparams += 1
# calculate and set params of calculating real coord between cube and mycobot
detect.set_params((detect.sum_x1 + detect.sum_x2) / 20.0,
(detect.sum_y1 + detect.sum_y2) / 20.0,
abs(detect.sum_x1 - detect.sum_x2) / 10.0 +
abs(detect.sum_y1 - detect.sum_y2) / 10.0)
print("ok")
continue
# get detect result
kp_img, desc_img = sift.detectAndCompute(frame, None)
frame = get_frame(parent_conn)
for i, v in enumerate(res_queue):
# HACK: to update frame every time
detect_result = detect.obj_detect(frame, v, kp_img, desc_img, kp_list[i], desc_list[i], parent_conn)
if detect_result:
x, y = detect_result
# calculate real coord between cube and mycobot
real_x, real_y = detect.get_position(x, y)
detect.color = i
detect.pub_marker(real_x / 1000.0, real_y / 1000.0)
detect.decide_move(real_x, real_y, detect.color)
# if num == 5:
# detect.color = i
# detect.pub_marker(real_sx / 5.0 / 1000.0,
# real_sy / 5.0 / 1000.0)
# detect.decide_move(real_sx / 5.0, real_sy / 5.0,
# detect.color)
# num = real_sx = real_sy = 0
# else:
# num += 1
# real_sy += real_y
# real_sx += real_x
parent_conn.send(CLEAR_DRAW)
# cv2.imshow("figure", frame)
time.sleep(0.05)
end_time = time.time()
# print("loop_time = ", end_time - start_time)
# close the window
if cv2.waitKey(1) & 0xFF == ord('q'):
# cap.release()
cv2.destroyAllWindows()
sys.exit()
child.join()
if __name__ == "__main__":
run()
# Object_detect().take_photo()
# Object_detect().cut_photo()