am_i_up/am_i_up/Api.py

83 lines
2.7 KiB
Python

#
# Copyright 2025 James Pace
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
#
# This Source Code Form is "Incompatible With Secondary Licenses", as
# defined by the Mozilla Public License, v. 2.0.
#
from aiohttp import web
import asyncio
from ament_index_python import get_package_share_directory
import rclpy
class Api:
def __init__(self, facts):
self._facts = facts
async def run(self):
ui_share_directory = get_package_share_directory("am_i_up_ui")
ui_static_directory = ui_share_directory + "/dist/assets"
app = web.Application()
app.add_routes([
web.get('/api/ping', self.ping),
web.get('/api/uptime', self.uptime),
web.get('/api/build_info', self.build_info),
web.get('/api/env', self.env),
web.get('/api/status', self.status),
web.static("/assets", ui_static_directory),
# we're not actually using key anywhere, but doing this allows react router
# to work correctly.
web.get("/{key:.*}", self.index)
])
url = "0.0.0.0"
port = 8888
print("Listening on {}:{}".format(url, port))
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, url, port)
await site.start()
while rclpy.ok():
await asyncio.sleep(3600)
await runner.cleanup()
async def index(self, request):
ui_share_directory = get_package_share_directory("am_i_up_ui")
ui_index_path = ui_share_directory + "/dist/index.html"
return web.FileResponse(ui_index_path)
async def ping(self, request):
request_dict = await request.json()
result = self._facts.do_ping(request_dict['address'])
resp = {"message": result}
return web.json_response(resp)
async def uptime(self, request):
resp = {"uptime": self._facts.get_uptime()}
return web.json_response(resp)
async def build_info(self, request):
project_state = self._facts.get_buildinfo()
if not project_state:
resp = {"status": False, "message": "project_state not found."}
return web.json_response(resp)
project_state["status"] = True
return web.json_response(project_state)
async def env(self, request):
env = self._facts.get_env()
return web.json_response(env)
async def status(self, request):
status = self._facts.get_status()
if not status:
status = "Nothing received!"
resp = {"message": status}
return web.json_response(resp)