Search Options

Display Count
Sort
Preferred Language
Advanced Search

Results 1 - 10 of 36 for StreamingResponse (0.08 seconds)

The search processing time has exceeded the limit. The displayed results may be partial.

  1. docs_src/stream_data/tutorial001_py310.py

    @app.get("/story/stream-no-annotation", response_class=StreamingResponse)
    async def stream_story_no_annotation():
        for line in message.splitlines():
            yield line
    
    
    @app.get("/story/stream-no-async-no-annotation", response_class=StreamingResponse)
    def stream_story_no_async_no_annotation():
        for line in message.splitlines():
            yield line
    
    
    @app.get("/story/stream-bytes", response_class=StreamingResponse)
    Created: Sun Apr 05 07:19:11 GMT 2026
    - Last Modified: Fri Feb 27 18:56:47 GMT 2026
    - 2.2K bytes
    - Click Count (0)
  2. fastapi/.agents/skills/fastapi/references/streaming.md

    ```
    
    ## Stream bytes
    
    To stream bytes, declare a `response_class=` of `StreamingResponse` or a sub-class, and use `yield` to return the data.
    
    ```python
    from fastapi import FastAPI
    from fastapi.responses import StreamingResponse
    from app.utils import read_image
    
    app = FastAPI()
    
    
    class PNGStreamingResponse(StreamingResponse):
        media_type = "image/png"
    
    @app.get("/image", response_class=PNGStreamingResponse)
    Created: Sun Apr 05 07:19:11 GMT 2026
    - Last Modified: Sun Mar 01 10:05:57 GMT 2026
    - 2.5K bytes
    - Click Count (0)
  3. tests/test_dependency_yield_scope.py

        def iter_data():
            yield json.dumps({"is_open": session.open})
    
        return StreamingResponse(iter_data())
    
    
    @app.get("/request-scope")
    def request_scope(session: SessionRequestDep) -> Any:
        def iter_data():
            yield json.dumps({"is_open": session.open})
    
        return StreamingResponse(iter_data())
    
    
    @app.get("/two-scopes")
    def get_stream_session(
    Created: Sun Apr 05 07:19:11 GMT 2026
    - Last Modified: Wed Dec 17 21:25:59 GMT 2025
    - 6.7K bytes
    - Click Count (0)
  4. docs/zh-hant/docs/advanced/stream-data.md

    你也可以用它來串流大型二進位檔案,邊讀邊將每個區塊(chunk)串流出去,而不必一次把整個檔案載入記憶體。
    
    你也可以用同樣方式串流視訊或音訊,甚至可以在處理的同時即時產生並傳送。
    
    ## 使用 `yield` 的 `StreamingResponse` { #a-streamingresponse-with-yield }
    
    如果在你的路徑操作函式中宣告 `response_class=StreamingResponse`,就可以用 `yield` 逐一送出每個資料區塊。
    
    {* ../../docs_src/stream_data/tutorial001_py310.py ln[1:23] hl[20,23] *}
    
    FastAPI 會如實將每個資料區塊交給 `StreamingResponse`,不會嘗試將其轉換為 JSON 或其他格式。
    
    ### 非 async 路徑操作函式 { #non-async-path-operation-functions }
    
    Created: Sun Apr 05 07:19:11 GMT 2026
    - Last Modified: Fri Mar 20 14:33:04 GMT 2026
    - 4.9K bytes
    - Click Count (0)
  5. docs_src/custom_response/tutorial007_py310.py

    import anyio
    from fastapi import FastAPI
    from fastapi.responses import StreamingResponse
    
    app = FastAPI()
    
    
    async def fake_video_streamer():
        for i in range(10):
            yield b"some fake video bytes"
            await anyio.sleep(0)
    
    
    @app.get("/")
    async def main():
    Created: Sun Apr 05 07:19:11 GMT 2026
    - Last Modified: Fri Feb 27 19:12:46 GMT 2026
    - 319 bytes
    - Click Count (0)
  6. docs_src/custom_response/tutorial008_py310.py

    from fastapi import FastAPI
    from fastapi.responses import StreamingResponse
    
    some_file_path = "large-video-file.mp4"
    app = FastAPI()
    
    
    @app.get("/")
    def main():
        def iterfile():  # (1)
            with open(some_file_path, mode="rb") as file_like:  # (2)
                yield from file_like  # (3)
    
    Created: Sun Apr 05 07:19:11 GMT 2026
    - Last Modified: Thu Feb 12 13:19:43 GMT 2026
    - 360 bytes
    - Click Count (0)
  7. docs_src/stream_data/tutorial002_py310.py

    import base64
    from collections.abc import AsyncIterable, Iterable
    from io import BytesIO
    
    from fastapi import FastAPI
    from fastapi.responses import StreamingResponse
    
    Created: Sun Apr 05 07:19:11 GMT 2026
    - Last Modified: Fri Feb 27 20:51:40 GMT 2026
    - 5.6K bytes
    - Click Count (0)
  8. docs_src/dependencies/tutorial013_an_py310.py

    import time
    from typing import Annotated
    
    from fastapi import Depends, FastAPI, HTTPException
    from fastapi.responses import StreamingResponse
    from sqlmodel import Field, Session, SQLModel, create_engine
    
    engine = create_engine("postgresql+psycopg://postgres:postgres@localhost/db")
    
    
    class User(SQLModel, table=True):
        id: int | None = Field(default=None, primary_key=True)
        name: str
    
    
    app = FastAPI()
    
    
    def get_session():
    Created: Sun Apr 05 07:19:11 GMT 2026
    - Last Modified: Mon Sep 29 03:29:38 GMT 2025
    - 937 bytes
    - Click Count (0)
  9. tests/test_stream_cancellation.py

    import anyio
    import pytest
    from fastapi import FastAPI
    from fastapi.responses import StreamingResponse
    
    pytestmark = [
        pytest.mark.anyio,
        pytest.mark.filterwarnings("ignore::pytest.PytestUnraisableExceptionWarning"),
    ]
    
    
    app = FastAPI()
    
    
    @app.get("/stream-raw", response_class=StreamingResponse)
    async def stream_raw() -> AsyncIterable[str]:
    Created: Sun Apr 05 07:19:11 GMT 2026
    - Last Modified: Fri Feb 27 18:56:47 GMT 2026
    - 2.7K bytes
    - Click Count (0)
  10. fastapi/responses.py

    from starlette.responses import RedirectResponse as RedirectResponse  # noqa
    from starlette.responses import Response as Response  # noqa
    from starlette.responses import StreamingResponse as StreamingResponse  # noqa
    from typing_extensions import deprecated
    
    try:
        import ujson
    except ImportError:  # pragma: nocover
        ujson = None  # type: ignore
    
    
    try:
        import orjson
    Created: Sun Apr 05 07:19:11 GMT 2026
    - Last Modified: Sun Mar 01 09:21:52 GMT 2026
    - 3.6K bytes
    - Click Count (0)
Back to Top