---
title: "Streaming responses (chunked HTTP, SSE)"
output: rmarkdown::html_vignette
vignette: >
  %\VignetteIndexEntry{Streaming responses (chunked HTTP, SSE)}
  %\VignetteEngine{knitr::rmarkdown}
  %\VignetteEncoding{UTF-8}
---

```{r setup, include=FALSE}
knitr::opts_chunk$set(eval = FALSE, comment = "#>")
```

A normal route handler builds the entire response body in one shot and
returns it. Sometimes you need the opposite — start sending bytes
before the work is done, and keep emitting more over time. Common
cases: a Server-Sent-Events feed, an LLM token stream, an NDJSON log
tail, anything where the client cares about *first byte* time more
than *last byte* time.

drogonR supports this through `dr_stream()` and the SSE convenience
wrapper `dr_stream_sse()`.

## How it works

A streaming handler returns a `drogon_stream` value (instead of a
normal response list). The dispatcher recognises the class, opens a
HTTP chunked-transfer response on Drogon's side, and from then on
calls your `next_chunk()` function on the main R thread, one chunk
at a time. Each call returns the bytes to send and a `done` flag.

```text
   R handler returns drogon_stream
                |
                v
   Drogon sends 200 + chunked headers ────► client
                |
                v
   pump: next_chunk(state, cancelled = FALSE)
                |  returns list(chunk, state, done = FALSE)
                v
   Drogon sends one chunk             ────► client
                |
                v
   pump: next_chunk(state, …)
                ...
                |  returns list(..., done = TRUE)
                v
   Drogon closes the chunked response ────► client
```

If the client disconnects mid-stream, the dispatcher catches the close
event from Drogon and runs your `next_chunk()` one final time with
`cancelled = TRUE`, so you can free state. The stream is then torn
down regardless of what that final call returns.

## `dr_stream()` — the base API

```r
library(drogonR)

app <- dr_app() |>
  dr_get("/numbers", function(req) {
    dr_stream(
      state = list(i = 0L, n = 10L),
      next_chunk = function(state, cancelled) {
        if (cancelled || state$i >= state$n) {
          return(list(chunk = "", state = state, done = TRUE))
        }
        state$i <- state$i + 1L
        list(
          chunk = sprintf("%d\n", state$i),
          state = state,
          done  = FALSE)
      },
      content_type = "text/plain")
  })

dr_serve(app, port = 8080L)
```

`next_chunk()` always returns `list(chunk = , state = , done = )`.
`chunk` is sent verbatim — format SSE / NDJSON / whatever yourself,
or use one of the helpers built on top.

## `dr_stream_sse()` — Server-Sent Events

90% of streaming endpoints just emit `data:` frames. `dr_stream_sse()`
takes care of:

* SSE framing (`data: <line>\n\n`, with multi-line `data` automatically
  split per the spec),
* default headers (`Content-Type: text/event-stream`,
  `Cache-Control: no-cache`, `X-Accel-Buffering: no` so reverse
  proxies don't buffer).

```r
app <- dr_app() |>
  dr_get("/sse", function(req) {
    dr_stream_sse(
      state = list(i = 0L, n = 5L),
      generator = function(state, cancelled) {
        if (cancelled || state$i >= state$n) {
          return(list(data = "", state = state, done = TRUE))
        }
        state$i <- state$i + 1L
        list(
          data  = sprintf("tick %d", state$i),
          state = state,
          done  = FALSE)
      })
  })

dr_serve(app, port = 8080L)
```

Test it:

```sh
curl -N http://127.0.0.1:8080/sse
# data: tick 1
#
# data: tick 2
# ...
```

Need `event:`, `id:`, or `retry:`? Use `dr_stream()` directly and
build the frame yourself — the helper deliberately keeps to just
`data:`.

## Threading: keep each pump short

`next_chunk()` always runs on the **main R thread**. R is
single-threaded, so this is the only place it could safely run.
Heavy work inside one pump blocks every other request and every
other stream until it returns.

* **Good:** each pump computes one row, formats it, returns.
* **Bad:** each pump does a 500 ms database query, or `Sys.sleep(1)`,
  or runs a long loop in pure R.

If you have CPU-bound work, split it across many pumps (carry
progress in `state`). If you have blocking I/O — read it on a
worker process and pass results in via `state` updates from outside,
not from inside the pump.

## Cancellation contract

When the client goes away, `next_chunk()` is called **exactly once**
with `cancelled = TRUE`. Use it to release per-stream resources
(file handles, DB cursors, accumulated buffers). The return value is
ignored — the stream is torn down either way.

```r
generator <- function(state, cancelled) {
  if (cancelled) {
    if (!is.null(state$conn)) close(state$conn)
    return(list(data = "", state = state, done = TRUE))
  }
  # ... normal path ...
}
```

The notification arrives as soon as Drogon's I/O thread sees the
TCP close (epoll on Linux, kqueue on BSD/macOS), not on the next
attempted send — drogonR carries a small Drogon patch
(`setUserCloseCallback`) that wires this up. Without it, small SSE
chunks would keep ticking long after the kernel had silently
absorbed the writes for a closed connection.

## Errors inside `next_chunk()`

If your generator raises an R error mid-stream, the dispatcher catches
it (`R_tryEval`), prints
`drogonR stream: next_chunk() raised an error; closing stream` to
stderr, and tears the session down. Headers are already on the wire
by then, so the client sees a truncated chunked response — there is
no way to send a 500 once streaming has started. The server stays up
and other requests are unaffected. If you want a custom on-error
behaviour, `tryCatch()` inside the generator yourself and return a
final error frame plus `done = TRUE`.

## Middleware does not wrap individual chunks

[dr_use()] middleware runs **once**, when the request enters R and
the handler returns the `drogon_stream` object. After that, the
dispatcher pumps `next_chunk()` directly — middleware is *not* called
per chunk, and [dr_on_error()] is *not* invoked for errors raised
inside `next_chunk()` (those are handled as described above). If you
need per-chunk hooks (metrics, mutation), wrap your generator
yourself.

## Native (C / C++) streaming

For LLM token streams or any other case where R-side overhead is the
bottleneck, register a streaming handler that bypasses R entirely:

```r
app <- dr_app() |>
  dr_get_cpp_stream("/v1/generate",
                    package  = "myllmbackend",
                    callable = "generate")
```

The backend implements `drogonr_stream_handler_t` from
`<drogonR.h>` (shipped under `inst/include/`) and pushes chunks via
`dr_send_chunk()` / `dr_close_chunk()` on a drogonR worker thread.
R-side middleware and the [dr_on_error()] hook do **not** apply —
the request never enters R. See `?dr_routes_cpp_stream`.

## Caveats

* Chunked streaming requires HTTP/1.1 keep-alive. Clients that send
  `Connection: close` won't receive any chunks — Drogon shortcuts to
  a single non-chunked response in that case.
* Don't call `dr_serve()` and immediately exit a script — the call is
  non-blocking. Standalone `Rscript` files that host a server need a
  loop pump, e.g. `repeat later::run_now(timeoutSecs = 3600)`, or the
  process will exit before the stream produces anything. Interactive
  R sessions don't need this.
* Heavy / many-chunk streams are end-to-end tested in
  `inst/examples/bench-stream-*.R` if you want to gauge throughput on
  your hardware.
