diff --git a/am_i_up/Api.py b/am_i_up/Api.py new file mode 100644 index 0000000..f7dd2c7 --- /dev/null +++ b/am_i_up/Api.py @@ -0,0 +1,82 @@ +# +# Copyright 2025 James Pace +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. +# +# This Source Code Form is "Incompatible With Secondary Licenses", as +# defined by the Mozilla Public License, v. 2.0. +# +from aiohttp import web +import asyncio +from ament_index_python import get_package_share_directory +import rclpy + +class Api: + def __init__(self, facts): + self._facts = facts + + async def run(self): + ui_share_directory = get_package_share_directory("am_i_up_ui") + ui_static_directory = ui_share_directory + "/dist/assets" + + app = web.Application() + app.add_routes([ + web.get('/api/ping', self.ping), + web.get('/api/uptime', self.uptime), + web.get('/api/build_info', self.build_info), + web.get('/api/env', self.env), + web.get('/api/status', self.status), + web.static("/assets", ui_static_directory), + # we're not actually using key anywhere, but doing this allows react router + # to work correctly. + web.get("/{key:.*}", self.index) + ]) + + url = "0.0.0.0" + port = 8888 + print("Listening on {}:{}".format(url, port)) + + runner = web.AppRunner(app) + await runner.setup() + site = web.TCPSite(runner, url, port) + await site.start() + + while rclpy.ok(): + await asyncio.sleep(3600) + await runner.cleanup() + + async def index(self, request): + ui_share_directory = get_package_share_directory("am_i_up_ui") + ui_index_path = ui_share_directory + "/dist/index.html" + return web.FileResponse(ui_index_path) + + async def ping(self, request): + request_dict = await request.json() + result = self._facts.do_ping(request_dict['address']) + resp = {"message": result} + return web.json_response(resp) + + async def uptime(self, request): + resp = {"uptime": self._facts.get_uptime()} + return web.json_response(resp) + + async def build_info(self, request): + project_state = self._facts.get_buildinfo() + if not project_state: + resp = {"status": False, "message": "project_state not found."} + return web.json_response(resp) + project_state["status"] = True + return web.json_response(project_state) + + async def env(self, request): + env = self._facts.get_env() + return web.json_response(env) + + async def status(self, request): + status = self._facts.get_status() + if not status: + status = "Nothing received!" + resp = {"message": status} + return web.json_response(resp) diff --git a/am_i_up/Facts.py b/am_i_up/Facts.py new file mode 100644 index 0000000..b6b4e02 --- /dev/null +++ b/am_i_up/Facts.py @@ -0,0 +1,84 @@ +# +# Copyright 2025 James Pace +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. +# +# This Source Code Form is "Incompatible With Secondary Licenses", as +# defined by the Mozilla Public License, v. 2.0. +# +import time +import ipaddress +import subprocess +import os +import yaml +from ament_index_python import get_package_share_directory + +class Facts: + def __init__(self): + self._start_time = time.monotonic() + self._status_string = None + + def set_status_string(self, status): + self._status_string = status + + def get_status(self): + return self._status_string + + def get_uptime(self): + return time.monotonic() - self._start_time + + def do_ping(self, address): + # Make sure address is a ip address. + if not is_valid_ip(address): + return "Provied address ({}) is not valid.".format(address) + + # Build command. + command = ["ping", "-W", "1", "-c", "1", address] + process = subprocess.run(command) + if process.returncode == 0: + return "{} responded to ping.".format(address) + return "{} did not respond to ping.".format(address) + + def get_env(self): + env = {} + # We're not going to return the whole environment because + # of security. + # Let's pick the ones we want. + env['ROS_AUTOMATIC_DISCOVERY_RANGE'] = os.environ.get('ROS_AUTOMATIC_DISCOVERY_RANGE', None) + env['AMENT_PREFIX_PATH'] = os.environ.get('AMENT_PREFIX_PATH', None) + env['ROS_DISTRO'] = os.environ.get('ROS_DISTRO', None) + env['RMW_IMPLEMENTATION'] = os.environ.get('RMW_IMPLEMENTATION', None) + env['ROS_NAMESPACE'] = os.environ.get('ROS_NAMESPACE', None) + env['CYCLONEDDS_URI'] = os.environ.get('CYCLONEDDS_URI', None) + + return env + + def get_buildinfo(self): + # Find the share directory for 'build_info_getter'. + build_info_getter_directory = None + try: + build_info_getter_directory = get_package_share_directory('build_info_getter') + except Exception as e: + print("Can't find build info.\n{}".format(e)) + return None + # Find and read the project_state.repos file in it. + project_state_file = build_info_getter_directory + "/project_state.repos" + project_state_content = None + try: + with open(project_state_file, 'r') as file_obj: + project_state_content = yaml.safe_load(file_obj) + except Exception as e: + # We either didn't load the file or couldn't read it + # as json. + print("Can't find build info.\n{}".format(e)) + return project_state_content + + +def is_valid_ip(address): + try: + ipaddress.ip_address(address) + return True + except ValueError: + return False diff --git a/am_i_up/Ros.py b/am_i_up/Ros.py new file mode 100644 index 0000000..3e1c9ed --- /dev/null +++ b/am_i_up/Ros.py @@ -0,0 +1,30 @@ +# +# Copyright 2025 James Pace +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. +# +# This Source Code Form is "Incompatible With Secondary Licenses", as +# defined by the Mozilla Public License, v. 2.0. +# +import rclpy +from std_msgs.msg import String +import asyncio + +class Ros: + def __init__(self, facts): + rclpy.init() + self._facts = facts + self._node = rclpy.create_node('am_i_up') + + self._node.create_subscription(String, "status", self.status_sub, 1) + + def status_sub(self, msg): + self._facts.set_status_string(msg.data) + + async def run(self): + while rclpy.ok(): + rclpy.spin_once(self._node, timeout_sec=0) + await asyncio.sleep(1e-4) + diff --git a/am_i_up/server.py b/am_i_up/server.py index 573872d..9d7372e 100644 --- a/am_i_up/server.py +++ b/am_i_up/server.py @@ -8,18 +8,11 @@ # This Source Code Form is "Incompatible With Secondary Licenses", as # defined by the Mozilla Public License, v. 2.0. # +from am_i_up.Api import Api +from am_i_up.Facts import Facts +from am_i_up.Ros import Ros + import asyncio -from aiohttp import web -from ament_index_python import get_package_share_directory -import time -import yaml -import ipaddress -import subprocess -import os - -import rclpy -from std_msgs.msg import String - def main(): facts = Facts() @@ -28,157 +21,3 @@ def main(): future = asyncio.gather(api.run(), ros.run()) asyncio.get_event_loop().run_until_complete(future) - - - -class Ros: - def __init__(self, facts): - rclpy.init() - self._facts = facts - self._node = rclpy.create_node('am_i_up') - - self._node.create_subscription(String, "status", self.status_sub, 1) - - def status_sub(self, msg): - self._facts.set_status_string(msg.data) - - async def run(self): - while rclpy.ok(): - rclpy.spin_once(self._node, timeout_sec=0) - await asyncio.sleep(1e-4) - - -class Facts: - def __init__(self): - self._start_time = time.monotonic() - self._status_string = None - - def set_status_string(self, status): - self._status_string = status - - def get_status(self): - return self._status_string - - def get_uptime(self): - return time.monotonic() - self._start_time - - def do_ping(self, address): - # Make sure address is a ip address. - if not is_valid_ip(address): - return "Provied address ({}) is not valid.".format(address) - - # Build command. - command = ["ping", "-W", "1", "-c", "1", address] - process = subprocess.run(command) - if process.returncode == 0: - return "{} responded to ping.".format(address) - return "{} did not respond to ping.".format(address) - - def get_env(self): - env = {} - # We're not going to return the whole environment because - # of security. - # Let's pick the ones we want. - env['ROS_AUTOMATIC_DISCOVERY_RANGE'] = os.environ.get('ROS_AUTOMATIC_DISCOVERY_RANGE', None) - env['AMENT_PREFIX_PATH'] = os.environ.get('AMENT_PREFIX_PATH', None) - env['ROS_DISTRO'] = os.environ.get('ROS_DISTRO', None) - env['RMW_IMPLEMENTATION'] = os.environ.get('RMW_IMPLEMENTATION', None) - env['ROS_NAMESPACE'] = os.environ.get('ROS_NAMESPACE', None) - env['CYCLONEDDS_URI'] = os.environ.get('CYCLONEDDS_URI', None) - - return env - - def get_buildinfo(self): - # Find the share directory for 'build_info_getter'. - build_info_getter_directory = None - try: - build_info_getter_directory = get_package_share_directory('build_info_getter') - except Exception as e: - print("Can't find build info.\n{}".format(e)) - return None - # Find and read the project_state.repos file in it. - project_state_file = build_info_getter_directory + "/project_state.repos" - project_state_content = None - try: - with open(project_state_file, 'r') as file_obj: - project_state_content = yaml.safe_load(file_obj) - except Exception as e: - # We either didn't load the file or couldn't read it - # as json. - print("Can't find build info.\n{}".format(e)) - return project_state_content - -class Api: - def __init__(self, facts): - self._facts = facts - - async def run(self): - ui_share_directory = get_package_share_directory("am_i_up_ui") - ui_static_directory = ui_share_directory + "/dist/assets" - - app = web.Application() - app.add_routes([ - web.get('/api/ping', self.ping), - web.get('/api/uptime', self.uptime), - web.get('/api/build_info', self.build_info), - web.get('/api/env', self.env), - web.get('/api/status', self.status), - web.static("/assets", ui_static_directory), - # we're not actually using key anywhere, but doing this allows react router - # to work correctly. - web.get("/{key:.*}", self.index) - ]) - - url = "0.0.0.0" - port = 8888 - print("Listening on {}:{}".format(url, port)) - - runner = web.AppRunner(app) - await runner.setup() - site = web.TCPSite(runner, url, port) - await site.start() - - while rclpy.ok(): - await asyncio.sleep(3600) - await runner.cleanup() - - async def index(self, request): - ui_share_directory = get_package_share_directory("am_i_up_ui") - ui_index_path = ui_share_directory + "/dist/index.html" - return web.FileResponse(ui_index_path) - - async def ping(self, request): - request_dict = await request.json() - result = self._facts.do_ping(request_dict['address']) - resp = {"message": result} - return web.json_response(resp) - - async def uptime(self, request): - resp = {"uptime": self._facts.get_uptime()} - return web.json_response(resp) - - async def build_info(self, request): - project_state = self._facts.get_buildinfo() - if not project_state: - resp = {"status": False, "message": "project_state not found."} - return web.json_response(resp) - project_state["status"] = True - return web.json_response(project_state) - - async def env(self, request): - env = self._facts.get_env() - return web.json_response(env) - - async def status(self, request): - status = self._facts.get_status() - if not status: - status = "Nothing received!" - resp = {"message": status} - return web.json_response(resp) - -def is_valid_ip(address): - try: - ipaddress.ip_address(address) - return True - except ValueError: - return False