import atexit import subprocess import time from aiohttp import web import os import aiohttp import asyncio web_root = './noVNC/' app_path = './out/miniRT' HOST = '0.0.0.0' PORT = 7080 SESSION_TTL = 60 service_name = 'minirt' class SessionManager: def __init__(self): self.session = None # single session self.display_counter = 100 def create_session(self): if self.session is not None: raise RuntimeError("Session already active") display_num = self.display_counter rfb_port = 5900 + display_num ws_port = 6080 + display_num try: xvnc_proc = subprocess.Popen( ['Xvnc', f':{display_num}', '-geometry', '1200x900', '-SecurityTypes', 'None', '-rfbport', str(rfb_port)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) # Wait until Xvnc is actually listening on rfb_port import socket for attempt in range(20): if xvnc_proc.poll() is not None: raise RuntimeError(f"Xvnc exited with code {xvnc_proc.returncode}") try: with socket.create_connection(('127.0.0.1', rfb_port), timeout=0.5): break except OSError: time.sleep(0.25) else: raise RuntimeError(f"Xvnc not listening on port {rfb_port} after 5s") env = os.environ.copy() env['DISPLAY'] = f':{display_num}' app_proc = subprocess.Popen( [app_path], env=env, # stdout=subprocess.DEVNULL, # stderr=subprocess.DEVNULL, ) websockify_proc = subprocess.Popen( ['websockify', str(ws_port), f'127.0.0.1:{rfb_port}'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) self.session = { 'display': display_num, 'rfb_port': rfb_port, 'ws_port': ws_port, 'xvnc_proc': xvnc_proc, 'websockify_proc': websockify_proc, 'app_proc': app_proc, 'created_at': time.time(), } print(f"Created session on display :{display_num}") except Exception as e: print(f"Error creating session: {e}") raise def get_session(self): return self.session def get_display(self): return self.display_counter def stop_session(self): sess = self.session if sess is None: return self.session = None for name in ('app_proc', 'xvnc_proc', 'websockify_proc'): proc = sess.get(name) if proc and proc.poll() is None: proc.terminate() try: proc.wait(timeout=2) except subprocess.TimeoutExpired: proc.kill() print("Stopped session") async def websockify_handler(request: web.Request): sess = manager.get_session() if sess is None: try: manager.create_session() sess = manager.get_session() except RuntimeError: return web.Response(status=503, text='Session already active') else: return web.Response(status=503, text='Session already active') if sess is None: return web.Response(status=403, text='Invalid Session') ws_server = web.WebSocketResponse(protocols=['binary']) await ws_server.prepare(request) try: async with aiohttp.ClientSession() as client: ws_client = None for attempt in range(10): try: ws_client = await client.ws_connect(f'ws://127.0.0.1:{sess["ws_port"]}') break except (aiohttp.ClientConnectorError, OSError): await asyncio.sleep(0.5) async with client.ws_connect(f'ws://127.0.0.1:{sess["ws_port"]}') as ws_client: async def forward(src, dst): async for msg in src: if msg.type == aiohttp.WSMsgType.BINARY: await dst.send_bytes(msg.data) elif msg.type == aiohttp.WSMsgType.TEXT: await dst.send_str(msg.data) else: break async def check_process(): while True: proc = sess.get('app_proc') if proc is None: break ret = proc.poll() if ret is not None: print(f"App exited with code {ret}, restarting...") env = os.environ.copy() env['DISPLAY'] = f':{sess["display"]}' new_proc = subprocess.Popen([app_path], env=env) print(f"New PID: {new_proc.pid}") sess['app_proc'] = new_proc await asyncio.sleep(1) tasks = [ asyncio.create_task(forward(ws_server, ws_client)), asyncio.create_task(forward(ws_client, ws_server)), asyncio.create_task(check_process()), ] await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) for t in tasks: t.cancel() await asyncio.gather(*tasks, return_exceptions=True) except Exception as e: print(f'Caught exception: {e}') finally: manager.stop_session() if not ws_server.closed: await ws_server.close() return ws_server manager = SessionManager() # --------------------------------------------------------------------------- # Routes # --------------------------------------------------------------------------- async def static_handler(request: web.Request): """Serve noVNC static files.""" path_info = request.match_info.get('path_info', '') if path_info == '': path_info = 'index.html' serve_path = os.path.normpath(os.path.join(web_root, path_info)) root_abs = os.path.abspath(web_root) serve_abs = os.path.abspath(serve_path) if not serve_abs.startswith(root_abs): return web.Response(status=403, text='Forbidden') if not os.path.exists(serve_abs): return web.Response(status=404, text='Not Found') if os.path.isdir(serve_abs): serve_abs = os.path.join(serve_abs, 'index.html') if not os.path.exists(serve_abs): return web.Response(status=404, text='Not Found') return web.FileResponse(serve_abs) # --------------------------------------------------------------------------- # App setup # --------------------------------------------------------------------------- def cleanup(): manager.stop_session() atexit.register(cleanup) @web.middleware async def print_req(req, handler): print(req) response = await handler(req); return response async def start_app(): app = web.Application(middlewares=[print_req]) app.router.add_get('/websockify', websockify_handler) app.router.add_get('/{path_info:.+}', static_handler) return app if __name__ == '__main__': web.run_app(start_app(), host=HOST, port=PORT)