>_
Published on

Inside Hypercorn: A Multi-Protocol ASGI Server Implementation

Written by Claude

Inside Hypercorn: A Multi-Protocol ASGI Server Implementation

Hypercorn is a Python ASGI web server supporting HTTP/1.1, HTTP/2, HTTP/3 (QUIC), and WebSocket protocols. This deep dive examines its implementation, focusing on event loop abstraction, protocol handling, and concurrency management.

Architecture Overview

Hypercorn's architecture centers on three key abstractions:

  1. Event Loop Backend - Pluggable async runtime (asyncio or Trio)
  2. Protocol Layer - Protocol-specific implementations (H11, H2, H3, WebSocket)
  3. Worker Context - Request lifecycle and resource management
┌─────────────────────────────────────┐
Worker Process│  ┌───────────────────────────────┐  │
│  │   Event Loop (asyncio/Trio)   │  │
│  │  ┌─────────────────────────┐  │  │
│  │  │  Protocol Handlers      │  │  │
│  │  │  • H11Protocol          │  │  │
│  │  │  • H2Protocol           │  │  │
│  │  │  • H3Protocol           │  │  │
│  │  │  • WSStream             │  │  │
│  │  └─────────────────────────┘  │  │
│  │  ┌─────────────────────────┐  │  │
│  │  │  ASGI Application       │  │  │
│  │  └─────────────────────────┘  │  │
│  └───────────────────────────────┘  │
WorkerContext (request tracking)└─────────────────────────────────────┘

Event Loop Abstraction

Hypercorn supports both asyncio and Trio through parallel implementations. The dual-backend approach trades code duplication for zero abstraction overhead.

Asyncio Implementation

async def worker_serve(
    app: AppWrapper,
    config: Config,
    *,
    sockets: Optional[Sockets] = None,
    shutdown_trigger: Optional[Callable[..., Awaitable]] = None,
) -> None:
    loop = asyncio.get_event_loop()

    # Signal handling
    signal_event = asyncio.Event()
    for signal_name in {"SIGINT", "SIGTERM", "SIGBREAK"}:
        if hasattr(signal, signal_name):
            loop.add_signal_handler(getattr(signal, signal_name),
                                   lambda: signal_event.set())

    # Lifespan management
    lifespan = Lifespan(app, config, loop, lifespan_state)
    await lifespan.wait_for_startup()

    # Server setup
    async def _server_callback(reader: StreamReader, writer: StreamWriter):
        task = asyncio.current_task(loop)
        server_tasks.add(task)
        task.add_done_callback(server_tasks.discard)
        await TCPServer(app, loop, config, context, lifespan_state,
                       reader, writer)

    # TCP servers (HTTP/1.1 and HTTP/2)
    servers = []
    for sock in sockets.secure_sockets:
        servers.append(await asyncio.start_server(
            _server_callback,
            backlog=config.backlog,
            ssl=ssl_context,
            sock=sock
        ))

    # UDP endpoint (HTTP/3/QUIC)
    for sock in sockets.quic_sockets:
        _, protocol = await loop.create_datagram_endpoint(
            lambda: UDPServer(app, loop, config, context, lifespan_state),
            sock=sock
        )

Key differences from typical servers:

  • ASGI lifespan protocol handling before accepting connections
  • Separate TCP/UDP endpoints for HTTP vs QUIC
  • Task tracking for graceful shutdown

Trio Implementation

async def worker_serve(
    app: AppWrapper,
    config: Config,
    *,
    sockets: Optional[Sockets] = None,
    shutdown_trigger: Optional[Callable[..., Awaitable[None]]] = None,
    task_status: trio.TaskStatus = trio.TASK_STATUS_IGNORED,
) -> None:
    async with trio.open_nursery() as lifespan_nursery:
        await lifespan_nursery.start(lifespan.handle_lifespan)
        await lifespan.wait_for_startup()

        async with trio.open_nursery() as server_nursery:
            # SSL wrapping via SSLListener
            listeners = []
            for sock in sockets.secure_sockets:
                listeners.append(
                    trio.SSLListener(
                        trio.SocketListener(trio.socket.from_stdlib_socket(sock)),
                        ssl_context,
                        https_compatible=True,
                    )
                )

            # Serve with structured concurrency
            nursery.start_soon(
                trio.serve_listeners,
                partial(TCPServer, app, config, context,
                       ConnectionState(lifespan_state.copy())),
                listeners,
                handler_nursery=server_nursery,
            )

Trio's structured concurrency:

  • Nested nurseries for lifecycle management
  • Explicit cancellation scopes for graceful shutdown
  • No manual task tracking - nursery handles cleanup

Event Abstraction

Both backends implement a unified Event interface through wrappers:

class EventWrapper:
    def __init__(self) -> None:
        self._event = asyncio.Event()

    async def clear(self) -> None:
        self._event.clear()

    async def wait(self) -> None:
        await self._event.wait()

    async def set(self) -> None:
        self._event.set()

Trio's version differs only in the underlying event type. This allows protocol implementations to be backend-agnostic.

Protocol Implementations

HTTP/1.1 (H11Protocol)

Built on the h11 state machine library. Handles connection recycling and upgrade detection.

class H11Protocol:
    def __init__(self, app, config, context, task_group,
                 connection_state, ssl, client, server, send):
        self.connection = h11.Connection(
            h11.SERVER,
            max_incomplete_event_size=config.h11_max_incomplete_size
        )
        self.stream: Optional[Union[HTTPStream, WSStream]] = None
        self.keep_alive_requests = 0

Protocol upgrade detection:

async def _check_protocol(self, event: h11.Request) -> None:
    upgrade_value = ""
    for name, value in event.headers:
        if name.decode("latin1").strip().lower() == "upgrade":
            upgrade_value = value.decode("latin1").strip()

    # H2C upgrade (HTTP/2 over cleartext)
    if upgrade_value.lower() == "h2c" and not has_body:
        await self._send_h11_event(
            h11.InformationalResponse(
                status_code=101,
                headers=[(b"connection", b"upgrade"), (b"upgrade", b"h2c")]
            )
        )
        raise H2CProtocolRequiredError(self.connection.trailing_data[0], event)

    # Direct HTTP/2 connection preface
    elif event.method == b"PRI" and event.target == b"*":
        raise H2ProtocolAssumedError(b"PRI * HTTP/2.0\r\n\r\n" + ...)

Connection recycling:

async def _maybe_recycle(self) -> None:
    await self._close_stream()
    if (not self.context.terminated.is_set() and
        self.connection.our_state is h11.DONE and
        self.connection.their_state is h11.DONE):
        try:
            self.connection.start_next_cycle()
        except h11.LocalProtocolError:
            await self.send(Closed())
        else:
            await self.can_read.set()
            await self.send(Updated(idle=True))

HTTP/2 (H2Protocol)

Uses the h2 library for frame handling, with custom flow control and prioritization.

class H2Protocol:
    def __init__(self, app, config, context, task_group,
                 connection_state, ssl, client, server, send):
        self.connection = h2.connection.H2Connection(
            config=h2.config.H2Configuration(
                client_side=False,
                header_encoding=None
            )
        )
        self.connection.local_settings = h2.settings.Settings(
            client=False,
            initial_values={
                h2.settings.SettingCodes.MAX_CONCURRENT_STREAMS:
                    config.h2_max_concurrent_streams,
                h2.settings.SettingCodes.MAX_HEADER_LIST_SIZE:
                    config.h2_max_header_list_size,
                h2.settings.SettingCodes.ENABLE_CONNECT_PROTOCOL: 1,
            },
        )
        self.streams: Dict[int, Union[HTTPStream, WSStream]] = {}
        self.priority = priority.PriorityTree()
        self.stream_buffers: Dict[int, StreamBuffer] = {}

Priority-based sending:

async def send_task(self) -> None:
    """Separate task for prioritized data transmission"""
    while not self.closed:
        try:
            stream_id = next(self.priority)  # Get highest priority stream
        except priority.DeadlockError:
            await self.has_data.wait()
            await self.has_data.clear()
        else:
            await self._send_data(stream_id)

async def _send_data(self, stream_id: int) -> None:
    chunk_size = min(
        self.connection.local_flow_control_window(stream_id),
        self.connection.max_outbound_frame_size,
    )
    data = await self.stream_buffers[stream_id].pop(chunk_size)
    if data:
        self.connection.send_data(stream_id, data)
        await self._flush()
    else:
        self.priority.block(stream_id)  # No data available

    if self.stream_buffers[stream_id].complete:
        self.connection.end_stream(stream_id)
        del self.stream_buffers[stream_id]
        self.priority.remove_stream(stream_id)

Why a separate send task? HTTP/2 allows multiple concurrent streams but requires fair, priority-based scheduling. The send task continuously pulls from the priority tree, respecting flow control windows.

Flow control buffering:

class StreamBuffer:
    def __init__(self, event_class: Type[IOEvent]) -> None:
        self.buffer = bytearray()
        self._complete = False
        self._is_empty = event_class()
        self._paused = event_class()

    async def push(self, data: bytes) -> None:
        """Backpressure when buffer is full"""
        if self._complete:
            raise BufferCompleteError()
        self.buffer.extend(data)
        await self._is_empty.clear()
        if len(self.buffer) >= BUFFER_HIGH_WATER:  # 2 * 16KB
            await self._paused.wait()  # Block until drained
            await self._paused.clear()

    async def pop(self, max_length: int) -> bytes:
        """Flow-controlled data extraction"""
        length = min(len(self.buffer), max_length)
        data = bytes(self.buffer[:length])
        del self.buffer[:length]
        if len(data) < BUFFER_LOW_WATER:
            await self._paused.set()  # Unpause producer
        return data

HTTP/3 (H3Protocol)

Delegates to aioquic for QUIC and HTTP/3 framing. Simplest implementation due to library handling complexity.

class H3Protocol:
    def __init__(self, app, config, context, task_group, state,
                 client, server, quic, send):
        self.connection = H3Connection(quic)
        self.streams: Dict[int, Union[HTTPStream, WSStream]] = {}

    async def handle(self, quic_event: QuicEvent) -> None:
        for event in self.connection.handle_event(quic_event):
            if isinstance(event, HeadersReceived):
                if not self.context.terminated.is_set():
                    await self._create_stream(event)
            elif isinstance(event, DataReceived):
                await self.streams[event.stream_id].handle(
                    Body(stream_id=event.stream_id, data=event.data)
                )

QUIC handles flow control, retransmission, and multiplexing natively, so no manual buffering is needed.

WebSocket (WSStream)

Handles WebSocket handshake, framing via wsproto, and ASGI WebSocket extension.

class Handshake:
    def is_valid(self) -> bool:
        """Validate WebSocket upgrade request"""
        if self.http_version < "1.1":
            return False
        elif self.http_version == "1.1":
            if self.key is None:
                return False
            if not any(token.lower() == "upgrade"
                      for token in self.connection_tokens):
                return False
            if self.upgrade.lower() != b"websocket":
                return False
        if self.version != WEBSOCKET_VERSION:  # "13"
            return False
        return True

    def accept(self, subprotocol, additional_headers):
        """Generate WebSocket accept response"""
        headers = []
        if subprotocol is not None:
            headers.append((b"sec-websocket-protocol",
                          subprotocol.encode()))

        # Extension negotiation
        extensions = [PerMessageDeflate()]
        if self.extensions:
            accepts = server_extensions_handshake(self.extensions,
                                                  extensions)
            if accepts:
                headers.append((b"sec-websocket-extensions", accepts))

        headers.append((b"sec-websocket-accept",
                       generate_accept_token(self.key)))

        if self.http_version == "1.1":
            headers.extend([(b"upgrade", b"WebSocket"),
                          (b"connection", b"Upgrade")])
            status_code = 101

        return status_code, headers, Connection(ConnectionType.SERVER,
                                               extensions)

Worker Context and Request Lifecycle

The WorkerContext tracks requests and manages graceful shutdown.

class WorkerContext:
    def __init__(self, max_requests: Optional[int]) -> None:
        self.max_requests = max_requests
        self.requests = 0
        self.terminate = self.event_class()
        self.terminated = self.event_class()

    async def mark_request(self) -> None:
        """Called after each request"""
        if self.max_requests is None:
            return

        self.requests += 1
        if self.requests > self.max_requests:
            await self.terminate.set()

Graceful shutdown flow:

  1. terminate event is set (max requests reached or signal received)
  2. Server stops accepting new connections
  3. Existing requests complete within graceful_timeout
  4. terminated event is set
  5. Worker exits

Key Implementation Patterns

1. Protocol Detection and Upgrading

HTTP/1.1 can upgrade to HTTP/2 (h2c) or WebSocket. Detection happens at the H11Protocol layer:

  • WebSocket: Check for Connection: upgrade + Upgrade: websocket
  • H2C: Check for Upgrade: h2c header or PRI * HTTP/2.0 method
  • Direct HTTP/2: TLS ALPN negotiation (outside protocol layer)

2. Stream vs Connection Lifecycle

ProtocolLifecycle
HTTP/1.1One stream per connection, recycled via keep-alive
HTTP/2Multiple streams per connection, tracked in dict
HTTP/3Multiple streams per QUIC connection
WebSocketOne long-lived stream per connection

3. Backpressure Handling

ProtocolMechanism
HTTP/1.1TCP socket backpressure (OS handles)
HTTP/2StreamBuffer with high/low watermarks (manual)
HTTP/3QUIC flow control (library handles)
WebSocketFrame size limits + TCP backpressure

Takeaways

  1. Event loop abstraction via parallel implementations - Duplication beats leaky abstractions when backends differ significantly (asyncio vs Trio)

  2. Protocol layering enables upgrade paths - H11Protocol detects upgrades and delegates to H2Protocol/WSStream dynamically

  3. HTTP/2 requires manual flow control - Unlike HTTP/1.1 (TCP handles it) and HTTP/3 (QUIC handles it), HTTP/2 needs application-level buffering and prioritization

  4. Graceful shutdown needs coordination - Lifespan events, connection draining, and timeout enforcement must work together

  5. ASGI is not just HTTP - Supporting WebSocket, HTTP/2 server push, and trailers requires careful ASGI event mapping

  6. Resource limits prevent DoS - Max requests, header size, message size, and concurrent streams all need enforcement

References: