NotesWhat is notes.io?

Notes brand slogan

Notes - notes.io

#!/usr/bin/env python3

from __future__ import annotations

import math
from enum import IntEnum
from typing import List, Tuple

import rospy
from geometry_msgs.msg import PoseStamped
from sensor_msgs.msg import CompressedImage, JointState
from std_msgs.msg import Empty

from datetime import datetime

Pose = Tuple[float, float]


class TableState(IntEnum):
Comb_scan = 1
Bee_scan = 1
Idle = 2


class TestScanner:
last_poses: List[Pose] = []
desired_poses: List[List[Pose]] = [[], [], [], []]
directions: List[bool] = []
table_states: List[TableState] = []

finished_tables: List[bool] = []
pose_subs: List[rospy.Subscriber] = []
image_subs: List[rospy.Subscriber] = []
ready_subs: List[rospy.Subscriber] = []

command_pubs: List[rospy.Publisher] = []
homing_pubs: List[rospy.Publisher] = []
image_pubs: List[List[rospy.Publisher]] = []

arena_limits = [[0.01, 0.332], [0.01, 0.222]] #[[min_x,max_x],[min_y,max_y]]

active_ids = [(0, 1),(0, 0), (1, 1), (1, 0)]
strides: List[Tuple[float, float]] = [(0.02, 0.02), (0.025, 0.025)]
scan_speeds = [0.015, 0.015, 0.015]

scan_img_counts = [10, 1]

img_counts: List[int] = []

full_scan_counts: List[int] = []

last_comb_scan_time: rospy.Time = rospy.Time.from_sec(0.0)
last_bee_scan_time: rospy.Time = rospy.Time.from_sec(0.0)

def __init__(self) -> None:
# self.arena_limits: List[List[float]] = rospy.get_param(
# "~arena_limits", self.arena_limits
# )

for i, id in enumerate(self.active_ids):
self.last_poses.append((self.arena_limits[0][0], self.arena_limits[1][0]))

self.desired_poses[i].append(
(self.arena_limits[0][0], self.arena_limits[1][0])
)

self.desired_poses[i].append(
(self.arena_limits[0][0], self.arena_limits[1][0])
)

self.desired_poses[i].append(
(self.arena_limits[0][0], self.arena_limits[1][0])
)

self.directions.append(True)
self.img_counts.append(0)
self.full_scan_counts.append(0)

self.finished_tables.append(True)
self.table_states.append(TableState.Idle)

prefix = f"/hive_{id[0]}/xy_{id[1]}"
self.pose_subs.append(
rospy.Subscriber(
f"{prefix}/xy_position", PoseStamped, self.pose_callback, i
)
)

self.image_subs.append(
rospy.Subscriber(
f"{prefix}/camera/image_mono/compressed",
CompressedImage,
self.image_callback,
i,
)
)

self.image_pubs.append(
[
rospy.Publisher(
f"{prefix}/scan_camera/compressed",
CompressedImage,
queue_size=5,
),
rospy.Publisher(
f"{prefix}/bee_camera/compressed", CompressedImage, queue_size=5
),
]
)

self.ready_subs.append(
rospy.Subscriber(f"{prefix}/xy_execution", Empty, self.exec_callback, i)
)

self.command_pubs.append(
rospy.Publisher(f"{prefix}/xy_cmd", JointState, queue_size=10)
)

self.homing_pubs.append(
rospy.Publisher(f"{prefix}/xy_homing", Empty, queue_size=10)
)

rospy.sleep(1.0)

def home_tables(self):
for i, pub in enumerate(self.homing_pubs):
print(i)
msg = Empty()
pub.publish(msg)

def pose_callback(self, msg: PoseStamped, id: int):
self.last_poses[id] = (msg.pose.position.x, msg.pose.position.y)

def exec_callback(self, _: Empty, id: int):
self.finished_tables[id] = True

def table_reached_pos(self, id: int, eps: float = 3e-3) -> bool:
curr_pose = self.last_poses[id]
state_id = self.table_states[id]

# if state_id == TableState.Idle:
# return True

des_pose = self.desired_poses[id][state_id]

diff = (des_pose[0] - curr_pose[0], des_pose[1] - curr_pose[1])

distance = math.sqrt((diff[0] ** 2) + (diff[1] ** 2))

return distance < eps and self.finished_tables[id]

def generate_next_pose(self, id: int) -> Pose:
last_desired = self.desired_poses[id][self.table_states[id]]
new_desired = last_desired
x_stride, y_stride = self.strides[self.table_states[id]]

# Go in positive direction
if self.directions[id]:
# There is still room in positive dir
if last_desired[0] + x_stride < self.arena_limits[0][1]:
new_desired = (last_desired[0] + x_stride, last_desired[1])

else:
# Go Upstairs
new_desired = (last_desired[0], last_desired[1] + y_stride)
self.directions[id] = not self.directions[id]
else:
if last_desired[0] - x_stride > self.arena_limits[0][0]:
new_desired = (last_desired[0] - x_stride, last_desired[1])

else:
# Go Upstairs
new_desired = (last_desired[0], last_desired[1] + y_stride)
self.directions[id] = not self.directions[id]

if new_desired[1] > self.arena_limits[1][1]:
new_desired = (self.arena_limits[0][0], self.arena_limits[1][0])
self.directions[id] = True

if self.table_states[id] == TableState.Comb_scan:
self.last_comb_scan_time = rospy.get_rostime()
elif self.table_states[id] == TableState.Bee_scan:
self.last_bee_scan_time = rospy.get_rostime()
rospy.loginfo(
f"{self.table_states[id].name} Finished for Table {id}. Going Idle."
)
self.full_scan_counts[id] += 1
self.table_states[id] = TableState.Idle

return new_desired

def image_callback(self, msg: CompressedImage, id: int):
if not self.table_reached_pos(id):
command_msg = JointState()
diff = self.desired_poses[id][self.table_states[id]]
command_msg.position = diff
command_msg.velocity = [self.scan_speeds[self.table_states[id]]]
self.command_pubs[id].publish(command_msg)
return

if self.table_states[id] == TableState.Idle:
if rospy.get_rostime() - self.last_comb_scan_time > rospy.Duration.from_sec(
3000.0
):
rospy.loginfo(
f"Table {id} has waited enough. Starting comb scanning again"
)
new_desired = (self.arena_limits[0][0], self.arena_limits[1][0])

command_msg = JointState()
command_msg.velocity = [self.scan_speeds[self.table_states[id]]]

command_msg.position = new_desired
self.command_pubs[id].publish(command_msg)

self.table_states[id] = TableState.Comb_scan
self.desired_poses[id][self.table_states[id]] = new_desired

elif (
rospy.get_rostime() - self.last_bee_scan_time
> rospy.Duration.from_sec(1000.0)
):
rospy.loginfo(
f"Table {id} has waited enough. Starting Bee scanning again"
)

new_desired = (self.arena_limits[0][0], self.arena_limits[1][0])

command_msg = JointState()
command_msg.velocity = [self.scan_speeds[self.table_states[id]]]

command_msg.position = new_desired
self.command_pubs[id].publish(command_msg)

self.table_states[id] = TableState.Bee_scan
self.desired_poses[id][self.table_states[id]] = new_desired

else:
return

# rospy.loginfo(f"Table {id} reached position")
self.image_pubs[id][self.table_states[id]].publish(msg)
self.img_counts[id] += 1

if self.img_counts[id] == self.scan_img_counts[self.table_states[id]]:
new_desired = self.generate_next_pose(id)
curr_desired = self.last_poses[id]
rospy.loginfo(f'[{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}] Table {id} New Desired: {new_desired}, old: {curr_desired}, scan count: {self.full_scan_counts[id]}')


diff = [new_desired[0], new_desired[1]]
command_msg = JointState()

command_msg.position = diff
command_msg.velocity = [self.scan_speeds[self.table_states[id]]]
self.command_pubs[id].publish(command_msg)

self.finished_tables[id] = False

self.desired_poses[id][self.table_states[id]] = new_desired
self.img_counts[id] = 0

def commandLoop(self):
pass


if __name__ == "__main__":
rospy.init_node("test_scanner")
rospy.loginfo("Initiating XY table control")
driver = TestScanner()
# driver.home_tables()
# driver = TestScanner()

rate = rospy.Rate(30.0)

while not rospy.is_shutdown():
driver.commandLoop()
rate.sleep()
     
 
what is notes.io
 

Notes.io is a web-based application for taking notes. You can take your notes and share with others people. If you like taking long notes, notes.io is designed for you. To date, over 8,000,000,000 notes created and continuing...

With notes.io;

  • * You can take a note from anywhere and any device with internet connection.
  • * You can share the notes in social platforms (YouTube, Facebook, Twitter, instagram etc.).
  • * You can quickly share your contents without website, blog and e-mail.
  • * You don't need to create any Account to share a note. As you wish you can use quick, easy and best shortened notes with sms, websites, e-mail, or messaging services (WhatsApp, iMessage, Telegram, Signal).
  • * Notes.io has fabulous infrastructure design for a short link and allows you to share the note as an easy and understandable link.

Fast: Notes.io is built for speed and performance. You can take a notes quickly and browse your archive.

Easy: Notes.io doesn’t require installation. Just write and share note!

Short: Notes.io’s url just 8 character. You’ll get shorten link of your note when you want to share. (Ex: notes.io/q )

Free: Notes.io works for 12 years and has been free since the day it was started.


You immediately create your first note and start sharing with the ones you wish. If you want to contact us, you can use the following communication channels;


Email: [email protected]

Twitter: http://twitter.com/notesio

Instagram: http://instagram.com/notes.io

Facebook: http://facebook.com/notesio



Regards;
Notes.io Team

     
 
Shortened Note Link
 
 
Looding Image
 
     
 
Long File
 
 

For written notes was greater than 18KB Unable to shorten.

To be smaller than 18KB, please organize your notes, or sign in.