Search Options

Display Count
Sort
Preferred Language
Advanced Search

Results 1 - 2 of 2 for stream_events (0.32 seconds)

  1. fastapi/.agents/skills/fastapi/references/streaming.md

    from fastapi import FastAPI
    from fastapi.sse import EventSourceResponse, ServerSentEvent
    
    app = FastAPI()
    
    
    @app.get("/events", response_class=EventSourceResponse)
    async def stream_events() -> AsyncIterable[ServerSentEvent]:
        yield ServerSentEvent(data={"status": "started"}, event="status", id="1")
        yield ServerSentEvent(data={"progress": 50}, event="progress", id="2")
    ```
    
    Created: Sun Apr 05 07:19:11 GMT 2026
    - Last Modified: Sun Mar 01 10:05:57 GMT 2026
    - 2.5K bytes
    - Click Count (0)
  2. tests/test_sse.py

        yield ServerSentEvent(raw_data="cpu,87.3,1709145600", event="csv")
    
    
    router = APIRouter()
    
    
    @router.get("/events", response_class=EventSourceResponse)
    async def stream_events():
        yield {"msg": "hello"}
        yield {"msg": "world"}
    
    
    app.include_router(router, prefix="/api")
    
    
    @pytest.fixture(name="client")
    def client_fixture():
        with TestClient(app) as c:
    Created: Sun Apr 05 07:19:11 GMT 2026
    - Last Modified: Sun Mar 01 09:21:52 GMT 2026
    - 9.8K bytes
    - Click Count (0)
Back to Top