- Sort Score
- Num 10 results
- Language All
Results 1 - 10 of 21 for ServerSentEvent (0.07 seconds)
-
tests/test_sse.py
@app.get("/items/stream-sse-event", response_class=EventSourceResponse) async def sse_items_event(): yield ServerSentEvent(data="hello", event="greeting", id="1") yield ServerSentEvent(data={"key": "value"}, event="json-data", id="2") yield ServerSentEvent(comment="just a comment") yield ServerSentEvent(data="retry-test", retry=5000) @app.get("/items/stream-mixed", response_class=EventSourceResponse)
Created: Sun Apr 05 07:19:11 GMT 2026 - Last Modified: Sun Mar 01 09:21:52 GMT 2026 - 9.8K bytes - Click Count (0) -
fastapi/.agents/skills/fastapi/references/streaming.md
```python from collections.abc import AsyncIterable from fastapi import FastAPI from fastapi.sse import EventSourceResponse, ServerSentEvent app = FastAPI() @app.get("/events", response_class=EventSourceResponse) async def stream_events() -> AsyncIterable[ServerSentEvent]: yield ServerSentEvent(data={"status": "started"}, event="status", id="1")
Created: Sun Apr 05 07:19:11 GMT 2026 - Last Modified: Sun Mar 01 10:05:57 GMT 2026 - 2.5K bytes - Click Count (0) -
docs_src/server_sent_events/tutorial004_py310.py
async def stream_items( last_event_id: Annotated[int | None, Header()] = None, ) -> AsyncIterable[ServerSentEvent]: start = last_event_id + 1 if last_event_id is not None else 0 for i, item in enumerate(items): if i < start: continue
Created: Sun Apr 05 07:19:11 GMT 2026 - Last Modified: Sun Mar 01 09:21:52 GMT 2026 - 795 bytes - Click Count (0) -
docs_src/server_sent_events/tutorial002_py310.py
async def stream_items() -> AsyncIterable[ServerSentEvent]: yield ServerSentEvent(comment="stream of item updates") for i, item in enumerate(items):
Created: Sun Apr 05 07:19:11 GMT 2026 - Last Modified: Sun Mar 01 09:21:52 GMT 2026 - 686 bytes - Click Count (0) -
docs_src/server_sent_events/tutorial005_py310.py
from fastapi.sse import EventSourceResponse, ServerSentEvent from pydantic import BaseModel app = FastAPI() class Prompt(BaseModel): text: str @app.post("/chat/stream", response_class=EventSourceResponse) async def stream_chat(prompt: Prompt) -> AsyncIterable[ServerSentEvent]: words = prompt.text.split() for word in words: yield ServerSentEvent(data=word, event="token")
Created: Sun Apr 05 07:19:11 GMT 2026 - Last Modified: Sun Mar 01 09:21:52 GMT 2026 - 528 bytes - Click Count (0) -
docs_src/server_sent_events/tutorial003_py310.py
from collections.abc import AsyncIterable from fastapi import FastAPI from fastapi.sse import EventSourceResponse, ServerSentEvent app = FastAPI() @app.get("/logs/stream", response_class=EventSourceResponse) async def stream_logs() -> AsyncIterable[ServerSentEvent]: logs = [ "2025-01-01 INFO Application started", "2025-01-01 DEBUG Connected to database", "2025-01-01 WARN High memory usage detected", ]
Created: Sun Apr 05 07:19:11 GMT 2026 - Last Modified: Sun Mar 01 09:21:52 GMT 2026 - 518 bytes - Click Count (0) -
docs/en/docs/tutorial/server-sent-events.md
{* ../../docs_src/server_sent_events/tutorial001_py310.py ln[34:37] hl[35] *} ## `ServerSentEvent` { #serversentevent } If you need to set SSE fields like `event`, `id`, `retry`, or `comment`, you can yield `ServerSentEvent` objects instead of plain data. Import `ServerSentEvent` from `fastapi.sse`: {* ../../docs_src/server_sent_events/tutorial002_py310.py hl[4,26] *}Created: Sun Apr 05 07:19:11 GMT 2026 - Last Modified: Thu Mar 05 18:13:19 GMT 2026 - 4.6K bytes - Click Count (0) -
docs/zh/docs/tutorial/server-sent-events.md
你也可以省略返回类型。FastAPI 将使用 [`jsonable_encoder`](./encoder.md) 转换数据并发送。 {* ../../docs_src/server_sent_events/tutorial001_py310.py ln[34:37] hl[35] *} ## `ServerSentEvent` { #serversentevent } 如果你需要设置 `event`、`id`、`retry` 或 `comment` 等 SSE 字段,你可以 yield `ServerSentEvent` 对象,而不是直接返回数据。 从 `fastapi.sse` 导入 `ServerSentEvent`: {* ../../docs_src/server_sent_events/tutorial002_py310.py hl[4,26] *} `data` 字段始终会被编码为 JSON。你可以传入任何可被序列化为 JSON 的值,包括 Pydantic 模型。Created: Sun Apr 05 07:19:11 GMT 2026 - Last Modified: Fri Mar 20 14:29:48 GMT 2026 - 4.6K bytes - Click Count (0) -
docs/zh-hant/docs/tutorial/server-sent-events.md
你也可以省略回傳型別。FastAPI 會使用 [`jsonable_encoder`](./encoder.md) 轉換資料並送出。 {* ../../docs_src/server_sent_events/tutorial001_py310.py ln[34:37] hl[35] *} ## `ServerSentEvent` { #serversentevent } 如果你需要設定 `event`、`id`、`retry` 或 `comment` 等 SSE 欄位,你可以改為 `yield` 出 `ServerSentEvent` 物件,而不是單純的資料。 從 `fastapi.sse` 匯入 `ServerSentEvent`: {* ../../docs_src/server_sent_events/tutorial002_py310.py hl[4,26] *} `data` 欄位一律會以 JSON 編碼。你可以傳入任何可序列化為 JSON 的值,包括 Pydantic 模型。Created: Sun Apr 05 07:19:11 GMT 2026 - Last Modified: Fri Mar 20 14:33:04 GMT 2026 - 4.6K bytes - Click Count (0) -
fastapi/sse.py
raise ValueError("SSE 'id' must not contain null characters") return v class ServerSentEvent(BaseModel): """Represents a single Server-Sent Event. When `yield`ed from a *path operation function* that uses `response_class=EventSourceResponse`, each `ServerSentEvent` is encoded into the [SSE wire format](https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream)
Created: Sun Apr 05 07:19:11 GMT 2026 - Last Modified: Sun Mar 01 09:21:52 GMT 2026 - 6.2K bytes - Click Count (0)