Source code for httpstan.app
"""Helper function to launch httpstan server.
Configure the server and schedule startup and shutdown tasks.
"""
import asyncio
import logging
import aiohttp.web
import httpstan.routes
try:
from uvloop import EventLoopPolicy
except ImportError:
EventLoopPolicy = asyncio.DefaultEventLoopPolicy
logger = logging.getLogger("httpstan")
[docs]async def _warn_unfinished_operations(app: aiohttp.web.Application) -> None:
"""Warn if tasks (e.g., operations) are unfinished.
Called immediately before tasks are cancelled.
"""
for name, operation in app["operations"].items():
if not operation["done"]:
logger.critical(f"Operation `{name}` cancelled before finishing.")
[docs]def make_app() -> aiohttp.web.Application:
"""Assemble aiohttp Application.
Returns:
aiohttp.web.Application: assembled aiohttp application.
"""
# default `client_max_size` is 1 MiB. Model `data` is often greater. Set to generous 512 GiB.
app = aiohttp.web.Application(client_max_size=512 * 1024**3)
httpstan.routes.setup_routes(app)
# startup and shutdown tasks
app["operations"] = {}
app.on_cleanup.append(_warn_unfinished_operations)
return app