am_i_up/am_i_up/Ros.py

56 lines
1.7 KiB
Python

#
# Copyright 2025 James Pace
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
#
# This Source Code Form is "Incompatible With Secondary Licenses", as
# defined by the Mozilla Public License, v. 2.0.
#
import rclpy
from std_msgs.msg import String
from sensor_msgs.msg import CompressedImage
from sensor_msgs.msg import NavSatFix
import asyncio
class Ros:
def __init__(self):
rclpy.init()
self._node = rclpy.create_node('am_i_up')
self._status_sub = self._node.create_subscription(String, "status", self._status_callback, 1)
self._image_sub = self._node.create_subscription(CompressedImage, "image/compressed", self._image_callback, 1)
self._navsat_sub = self._node.create_subscription(NavSatFix, "navsatfix", self._navsat_callback, 1)
self._status_string = None
self._lat_long = None
self._costmap_image = None
def get_status_string(self):
return self._status_string
def get_lat_long(self):
return self._lat_long
def get_costmap_image(self):
return self._costmap_image
async def run(self):
while rclpy.ok():
rclpy.spin_once(self._node, timeout_sec=0)
await asyncio.sleep(1e-4)
def _status_callback(self, msg):
self._status_string = msg.data
def _navsat_callback(self, msg):
self._lat_long = (msg.latitude, msg.longitude)
def _image_callback(self, msg):
if msg.format != "jpg" and msg.format != "png":
# I don't know what to do with this image....
return
self._costmap_image = (msg.format, bytes(msg.data))