122 lines
4.0 KiB
Python
122 lines
4.0 KiB
Python
#
|
|
# Copyright 2025 James Pace
|
|
#
|
|
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
#
|
|
# This Source Code Form is "Incompatible With Secondary Licenses", as
|
|
# defined by the Mozilla Public License, v. 2.0.
|
|
#
|
|
from aiohttp import web
|
|
import time
|
|
from ament_index_python import get_package_share_directory
|
|
import yaml
|
|
import ipaddress
|
|
import subprocess
|
|
|
|
def main():
|
|
facts = Facts()
|
|
routes = Routes(facts)
|
|
|
|
app = web.Application()
|
|
app.add_routes([
|
|
web.get('/', routes.root),
|
|
web.get('/ping', routes.ping),
|
|
web.get('/uptime', routes.uptime),
|
|
web.get('/build_info', routes.build_info),
|
|
web.get('/env', routes.env)
|
|
])
|
|
web.run_app(app)
|
|
|
|
class Facts:
|
|
def __init__(self):
|
|
self._start_time = time.monotonic()
|
|
|
|
def get_uptime(self):
|
|
return time.monotonic() - self._start_time
|
|
|
|
def do_ping(self, address):
|
|
# Make sure address is a ip address.
|
|
if not is_valid_ip(address):
|
|
return "Provied address ({}) is not valid.".format(address)
|
|
|
|
# Build command.
|
|
command = ["ping", "-W", "1", "-c", "1", address]
|
|
process = subprocess.run(command)
|
|
if process.returncode == 0:
|
|
return "{} responded to ping.".format(address)
|
|
return "{} did not respond to ping.".format(address)
|
|
|
|
def get_env(self):
|
|
env = {}
|
|
# We're not going to return the whole environment because
|
|
# of security.
|
|
# Let's pick the ones we want.
|
|
env['ROS_AUTOMATIC_DISCOVERY_RANGE'] = os.environ.get('ROS_AUTOMATIC_DISCOVERY_RANGE', None)
|
|
env['AMENT_PREFIX_PATH'] = os.environ.get('AMENT_PREFIX_PATH', None)
|
|
env['ROS_DISTRO'] = os.environ.get('ROS_DISTRO', None)
|
|
env['RMW_IMPLEMENTATION'] = os.environ.get('RMW_IMPLEMENTATION', None)
|
|
env['ROS_NAMESPACE'] = os.environ.get('ROS_NAMESPACE', None)
|
|
env['CYCLONEDDS_URI'] = os.environ.get('CYCLONEDDS_URI', None)
|
|
|
|
return env
|
|
|
|
def get_buildinfo(self):
|
|
# Find the share directory for 'build_info_getter'.
|
|
build_info_getter_directory = None
|
|
try:
|
|
build_info_getter_directory = get_package_share_directory('build_info_getter')
|
|
except Exception as e:
|
|
print("Can't find build info.\n{}".format(e))
|
|
return None
|
|
# Find and read the project_state.repos file in it.
|
|
project_state_file = build_info_getter_directory + "/project_state.repos"
|
|
project_state_content = None
|
|
try:
|
|
with open(project_state_file, 'r') as file_obj:
|
|
project_state_content = yaml.safe_load(file_obj)
|
|
except Exception as e:
|
|
# We either didn't load the file or couldn't read it
|
|
# as json.
|
|
print("Can't find build info.\n{}".format(e))
|
|
return project_state_content
|
|
|
|
class Routes:
|
|
def __init__(self, facts):
|
|
self._facts = facts
|
|
|
|
async def root(self, request):
|
|
text = 'hello!'
|
|
return web.Response(text=text)
|
|
|
|
async def ping(self, request):
|
|
request_dict = await request.json()
|
|
result = self._facts.do_ping(request_dict['address'])
|
|
resp = {"message": result}
|
|
return web.json_response(resp)
|
|
|
|
async def uptime(self, request):
|
|
resp = {"uptime": self._facts.get_uptime()}
|
|
return web.json_response(resp)
|
|
|
|
async def build_info(self, request):
|
|
project_state = self._facts.get_buildinfo()
|
|
if not project_state:
|
|
resp = {"status": False, "message": "project_state not found."}
|
|
return web.json_response(resp)
|
|
project_state["status"] = True
|
|
return web.json_response(project_state)
|
|
|
|
async def env(self, request):
|
|
env = self._facts.get_env()
|
|
return web.json_response(env)
|
|
|
|
|
|
def is_valid_ip(address):
|
|
try:
|
|
ipaddress.ip_address(address)
|
|
return True
|
|
except ValueError:
|
|
return False
|