Notes
Notes - notes.io |
from __future__ import annotations
import math
from enum import IntEnum
from typing import List, Tuple
import rospy
from geometry_msgs.msg import PoseStamped
from sensor_msgs.msg import CompressedImage, JointState
from std_msgs.msg import Empty
from datetime import datetime
Pose = Tuple[float, float]
class TableState(IntEnum):
Comb_scan = 1
Bee_scan = 1
Idle = 2
class TestScanner:
last_poses: List[Pose] = []
desired_poses: List[List[Pose]] = [[], [], [], []]
directions: List[bool] = []
table_states: List[TableState] = []
finished_tables: List[bool] = []
pose_subs: List[rospy.Subscriber] = []
image_subs: List[rospy.Subscriber] = []
ready_subs: List[rospy.Subscriber] = []
command_pubs: List[rospy.Publisher] = []
homing_pubs: List[rospy.Publisher] = []
image_pubs: List[List[rospy.Publisher]] = []
arena_limits = [[0.01, 0.332], [0.01, 0.222]] #[[min_x,max_x],[min_y,max_y]]
active_ids = [(0, 1),(0, 0), (1, 1), (1, 0)]
strides: List[Tuple[float, float]] = [(0.02, 0.02), (0.025, 0.025)]
scan_speeds = [0.015, 0.015, 0.015]
scan_img_counts = [10, 1]
img_counts: List[int] = []
full_scan_counts: List[int] = []
last_comb_scan_time: rospy.Time = rospy.Time.from_sec(0.0)
last_bee_scan_time: rospy.Time = rospy.Time.from_sec(0.0)
def __init__(self) -> None:
# self.arena_limits: List[List[float]] = rospy.get_param(
# "~arena_limits", self.arena_limits
# )
for i, id in enumerate(self.active_ids):
self.last_poses.append((self.arena_limits[0][0], self.arena_limits[1][0]))
self.desired_poses[i].append(
(self.arena_limits[0][0], self.arena_limits[1][0])
)
self.desired_poses[i].append(
(self.arena_limits[0][0], self.arena_limits[1][0])
)
self.desired_poses[i].append(
(self.arena_limits[0][0], self.arena_limits[1][0])
)
self.directions.append(True)
self.img_counts.append(0)
self.full_scan_counts.append(0)
self.finished_tables.append(True)
self.table_states.append(TableState.Idle)
prefix = f"/hive_{id[0]}/xy_{id[1]}"
self.pose_subs.append(
rospy.Subscriber(
f"{prefix}/xy_position", PoseStamped, self.pose_callback, i
)
)
self.image_subs.append(
rospy.Subscriber(
f"{prefix}/camera/image_mono/compressed",
CompressedImage,
self.image_callback,
i,
)
)
self.image_pubs.append(
[
rospy.Publisher(
f"{prefix}/scan_camera/compressed",
CompressedImage,
queue_size=5,
),
rospy.Publisher(
f"{prefix}/bee_camera/compressed", CompressedImage, queue_size=5
),
]
)
self.ready_subs.append(
rospy.Subscriber(f"{prefix}/xy_execution", Empty, self.exec_callback, i)
)
self.command_pubs.append(
rospy.Publisher(f"{prefix}/xy_cmd", JointState, queue_size=10)
)
self.homing_pubs.append(
rospy.Publisher(f"{prefix}/xy_homing", Empty, queue_size=10)
)
rospy.sleep(1.0)
def home_tables(self):
for i, pub in enumerate(self.homing_pubs):
print(i)
msg = Empty()
pub.publish(msg)
def pose_callback(self, msg: PoseStamped, id: int):
self.last_poses[id] = (msg.pose.position.x, msg.pose.position.y)
def exec_callback(self, _: Empty, id: int):
self.finished_tables[id] = True
def table_reached_pos(self, id: int, eps: float = 3e-3) -> bool:
curr_pose = self.last_poses[id]
state_id = self.table_states[id]
# if state_id == TableState.Idle:
# return True
des_pose = self.desired_poses[id][state_id]
diff = (des_pose[0] - curr_pose[0], des_pose[1] - curr_pose[1])
distance = math.sqrt((diff[0] ** 2) + (diff[1] ** 2))
return distance < eps and self.finished_tables[id]
def generate_next_pose(self, id: int) -> Pose:
last_desired = self.desired_poses[id][self.table_states[id]]
new_desired = last_desired
x_stride, y_stride = self.strides[self.table_states[id]]
# Go in positive direction
if self.directions[id]:
# There is still room in positive dir
if last_desired[0] + x_stride < self.arena_limits[0][1]:
new_desired = (last_desired[0] + x_stride, last_desired[1])
else:
# Go Upstairs
new_desired = (last_desired[0], last_desired[1] + y_stride)
self.directions[id] = not self.directions[id]
else:
if last_desired[0] - x_stride > self.arena_limits[0][0]:
new_desired = (last_desired[0] - x_stride, last_desired[1])
else:
# Go Upstairs
new_desired = (last_desired[0], last_desired[1] + y_stride)
self.directions[id] = not self.directions[id]
if new_desired[1] > self.arena_limits[1][1]:
new_desired = (self.arena_limits[0][0], self.arena_limits[1][0])
self.directions[id] = True
if self.table_states[id] == TableState.Comb_scan:
self.last_comb_scan_time = rospy.get_rostime()
elif self.table_states[id] == TableState.Bee_scan:
self.last_bee_scan_time = rospy.get_rostime()
rospy.loginfo(
f"{self.table_states[id].name} Finished for Table {id}. Going Idle."
)
self.full_scan_counts[id] += 1
self.table_states[id] = TableState.Idle
return new_desired
def image_callback(self, msg: CompressedImage, id: int):
if not self.table_reached_pos(id):
rospy.sleep(1)
command_msg = JointState()
diff = self.desired_poses[id][self.table_states[id]]
command_msg.position = diff
command_msg.velocity = [self.scan_speeds[self.table_states[id]]]
self.command_pubs[id].publish(command_msg)
return
if self.table_states[id] == TableState.Idle:
if rospy.get_rostime() - self.last_comb_scan_time > rospy.Duration.from_sec(
3000.0
):
rospy.loginfo(
f"Table {id} has waited enough. Starting comb scanning again"
)
new_desired = (self.arena_limits[0][0], self.arena_limits[1][0])
command_msg = JointState()
command_msg.velocity = [self.scan_speeds[self.table_states[id]]]
command_msg.position = new_desired
self.command_pubs[id].publish(command_msg)
self.table_states[id] = TableState.Comb_scan
self.desired_poses[id][self.table_states[id]] = new_desired
elif (
rospy.get_rostime() - self.last_bee_scan_time
> rospy.Duration.from_sec(1000.0)
):
rospy.loginfo(
f"Table {id} has waited enough. Starting Bee scanning again"
)
new_desired = (self.arena_limits[0][0], self.arena_limits[1][0])
command_msg = JointState()
command_msg.velocity = [self.scan_speeds[self.table_states[id]]]
command_msg.position = new_desired
self.command_pubs[id].publish(command_msg)
self.table_states[id] = TableState.Bee_scan
self.desired_poses[id][self.table_states[id]] = new_desired
else:
return
# rospy.loginfo(f"Table {id} reached position")
self.image_pubs[id][self.table_states[id]].publish(msg)
self.img_counts[id] += 1
if self.img_counts[id] == self.scan_img_counts[self.table_states[id]]:
new_desired = self.generate_next_pose(id)
curr_desired = self.last_poses[id]
rospy.loginfo(f'[{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}] Table {id} New Desired: {new_desired}, old: {curr_desired}, scan count: {self.full_scan_counts[id]}')
diff = [new_desired[0], new_desired[1]]
command_msg = JointState()
command_msg.position = diff
command_msg.velocity = [self.scan_speeds[self.table_states[id]]]
self.command_pubs[id].publish(command_msg)
self.finished_tables[id] = False
self.desired_poses[id][self.table_states[id]] = new_desired
self.img_counts[id] = 0
def commandLoop(self):
pass
if __name__ == "__main__":
rospy.init_node("test_scanner")
rospy.loginfo("Initiating XY table control")
driver = TestScanner()
# driver.home_tables()
# driver = TestScanner()
rate = rospy.Rate(5.0)
while not rospy.is_shutdown():
driver.commandLoop()
rate.sleep()
|
Notes.io is a web-based application for taking notes. You can take your notes and share with others people. If you like taking long notes, notes.io is designed for you. To date, over 8,000,000,000 notes created and continuing...
With notes.io;
- * You can take a note from anywhere and any device with internet connection.
- * You can share the notes in social platforms (YouTube, Facebook, Twitter, instagram etc.).
- * You can quickly share your contents without website, blog and e-mail.
- * You don't need to create any Account to share a note. As you wish you can use quick, easy and best shortened notes with sms, websites, e-mail, or messaging services (WhatsApp, iMessage, Telegram, Signal).
- * Notes.io has fabulous infrastructure design for a short link and allows you to share the note as an easy and understandable link.
Fast: Notes.io is built for speed and performance. You can take a notes quickly and browse your archive.
Easy: Notes.io doesn’t require installation. Just write and share note!
Short: Notes.io’s url just 8 character. You’ll get shorten link of your note when you want to share. (Ex: notes.io/q )
Free: Notes.io works for 12 years and has been free since the day it was started.
You immediately create your first note and start sharing with the ones you wish. If you want to contact us, you can use the following communication channels;
Email: [email protected]
Twitter: http://twitter.com/notesio
Instagram: http://instagram.com/notes.io
Facebook: http://facebook.com/notesio
Regards;
Notes.io Team