Search Options

Display Count
Sort
Preferred Language
Advanced Search

Results 1 - 10 of 21 for ServerSentEvent (0.12 seconds)

  1. tests/test_sse.py

    
    @app.get("/items/stream-sse-event", response_class=EventSourceResponse)
    async def sse_items_event():
        yield ServerSentEvent(data="hello", event="greeting", id="1")
        yield ServerSentEvent(data={"key": "value"}, event="json-data", id="2")
        yield ServerSentEvent(comment="just a comment")
        yield ServerSentEvent(data="retry-test", retry=5000)
    
    
    @app.get("/items/stream-mixed", response_class=EventSourceResponse)
    Created: Sun Apr 05 07:19:11 GMT 2026
    - Last Modified: Sun Mar 01 09:21:52 GMT 2026
    - 9.8K bytes
    - Click Count (0)
  2. fastapi/.agents/skills/fastapi/references/streaming.md

    ```python
    from collections.abc import AsyncIterable
    
    from fastapi import FastAPI
    from fastapi.sse import EventSourceResponse, ServerSentEvent
    
    app = FastAPI()
    
    
    @app.get("/events", response_class=EventSourceResponse)
    async def stream_events() -> AsyncIterable[ServerSentEvent]:
        yield ServerSentEvent(data={"status": "started"}, event="status", id="1")
    Created: Sun Apr 05 07:19:11 GMT 2026
    - Last Modified: Sun Mar 01 10:05:57 GMT 2026
    - 2.5K bytes
    - Click Count (0)
  3. docs_src/server_sent_events/tutorial004_py310.py

    async def stream_items(
        last_event_id: Annotated[int | None, Header()] = None,
    ) -> AsyncIterable[ServerSentEvent]:
        start = last_event_id + 1 if last_event_id is not None else 0
        for i, item in enumerate(items):
            if i < start:
                continue
    Created: Sun Apr 05 07:19:11 GMT 2026
    - Last Modified: Sun Mar 01 09:21:52 GMT 2026
    - 795 bytes
    - Click Count (0)
  4. docs_src/server_sent_events/tutorial002_py310.py

    async def stream_items() -> AsyncIterable[ServerSentEvent]:
        yield ServerSentEvent(comment="stream of item updates")
        for i, item in enumerate(items):
    Created: Sun Apr 05 07:19:11 GMT 2026
    - Last Modified: Sun Mar 01 09:21:52 GMT 2026
    - 686 bytes
    - Click Count (0)
  5. docs_src/server_sent_events/tutorial005_py310.py

    from fastapi.sse import EventSourceResponse, ServerSentEvent
    from pydantic import BaseModel
    
    app = FastAPI()
    
    
    class Prompt(BaseModel):
        text: str
    
    
    @app.post("/chat/stream", response_class=EventSourceResponse)
    async def stream_chat(prompt: Prompt) -> AsyncIterable[ServerSentEvent]:
        words = prompt.text.split()
        for word in words:
            yield ServerSentEvent(data=word, event="token")
    Created: Sun Apr 05 07:19:11 GMT 2026
    - Last Modified: Sun Mar 01 09:21:52 GMT 2026
    - 528 bytes
    - Click Count (0)
  6. docs_src/server_sent_events/tutorial003_py310.py

    from collections.abc import AsyncIterable
    
    from fastapi import FastAPI
    from fastapi.sse import EventSourceResponse, ServerSentEvent
    
    app = FastAPI()
    
    
    @app.get("/logs/stream", response_class=EventSourceResponse)
    async def stream_logs() -> AsyncIterable[ServerSentEvent]:
        logs = [
            "2025-01-01 INFO  Application started",
            "2025-01-01 DEBUG Connected to database",
            "2025-01-01 WARN  High memory usage detected",
        ]
    Created: Sun Apr 05 07:19:11 GMT 2026
    - Last Modified: Sun Mar 01 09:21:52 GMT 2026
    - 518 bytes
    - Click Count (0)
  7. docs/en/docs/tutorial/server-sent-events.md

    {* ../../docs_src/server_sent_events/tutorial001_py310.py ln[34:37] hl[35] *}
    
    ## `ServerSentEvent` { #serversentevent }
    
    If you need to set SSE fields like `event`, `id`, `retry`, or `comment`, you can yield `ServerSentEvent` objects instead of plain data.
    
    Import `ServerSentEvent` from `fastapi.sse`:
    
    {* ../../docs_src/server_sent_events/tutorial002_py310.py hl[4,26] *}
    
    Created: Sun Apr 05 07:19:11 GMT 2026
    - Last Modified: Thu Mar 05 18:13:19 GMT 2026
    - 4.6K bytes
    - Click Count (0)
  8. docs/zh/docs/tutorial/server-sent-events.md

    你也可以省略返回类型。FastAPI 将使用 [`jsonable_encoder`](./encoder.md) 转换数据并发送。
    
    {* ../../docs_src/server_sent_events/tutorial001_py310.py ln[34:37] hl[35] *}
    
    ## `ServerSentEvent` { #serversentevent }
    
    如果你需要设置 `event`、`id`、`retry` 或 `comment` 等 SSE 字段,你可以 yield `ServerSentEvent` 对象,而不是直接返回数据。
    
    从 `fastapi.sse` 导入 `ServerSentEvent`:
    
    {* ../../docs_src/server_sent_events/tutorial002_py310.py hl[4,26] *}
    
    `data` 字段始终会被编码为 JSON。你可以传入任何可被序列化为 JSON 的值,包括 Pydantic 模型。
    
    Created: Sun Apr 05 07:19:11 GMT 2026
    - Last Modified: Fri Mar 20 14:29:48 GMT 2026
    - 4.6K bytes
    - Click Count (0)
  9. docs/zh-hant/docs/tutorial/server-sent-events.md

    你也可以省略回傳型別。FastAPI 會使用 [`jsonable_encoder`](./encoder.md) 轉換資料並送出。
    
    {* ../../docs_src/server_sent_events/tutorial001_py310.py ln[34:37] hl[35] *}
    
    ## `ServerSentEvent` { #serversentevent }
    
    如果你需要設定 `event`、`id`、`retry` 或 `comment` 等 SSE 欄位,你可以改為 `yield` 出 `ServerSentEvent` 物件,而不是單純的資料。
    
    從 `fastapi.sse` 匯入 `ServerSentEvent`:
    
    {* ../../docs_src/server_sent_events/tutorial002_py310.py hl[4,26] *}
    
    `data` 欄位一律會以 JSON 編碼。你可以傳入任何可序列化為 JSON 的值,包括 Pydantic 模型。
    
    Created: Sun Apr 05 07:19:11 GMT 2026
    - Last Modified: Fri Mar 20 14:33:04 GMT 2026
    - 4.6K bytes
    - Click Count (0)
  10. fastapi/sse.py

            raise ValueError("SSE 'id' must not contain null characters")
        return v
    
    
    class ServerSentEvent(BaseModel):
        """Represents a single Server-Sent Event.
    
        When `yield`ed from a *path operation function* that uses
        `response_class=EventSourceResponse`, each `ServerSentEvent` is encoded
        into the [SSE wire format](https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream)
    Created: Sun Apr 05 07:19:11 GMT 2026
    - Last Modified: Sun Mar 01 09:21:52 GMT 2026
    - 6.2K bytes
    - Click Count (0)
Back to Top