- Sort Score
- Result 10 results
- Languages All
Results 1 - 10 of 12 for connection (0.19 sec)
-
tests/test_http_connection_injection.py
app.state.value = 42 async def extract_value_from_http_connection(conn: HTTPConnection): return conn.app.state.value @app.get("/http") async def get_value_by_http(value: int = Depends(extract_value_from_http_connection)): return value @app.websocket("/ws") async def get_value_by_ws( websocket: WebSocket, value: int = Depends(extract_value_from_http_connection) ): await websocket.accept()
Python - Registered: Sun Apr 28 07:19:10 GMT 2024 - Last Modified: Sun Aug 09 13:56:41 GMT 2020 - 972 bytes - Viewed (0) -
tests/test_tutorial/test_websockets/test_tutorial003.py
def test_websocket_handle_disconnection(): with client.websocket_connect("/ws/1234") as connection, client.websocket_connect( "/ws/5678" ) as connection_two: connection.send_text("Hello from 1234") data1 = connection.receive_text() assert data1 == "You wrote: Hello from 1234" data2 = connection_two.receive_text() client1_says = "Client #1234 says: Hello from 1234"
Python - Registered: Sun Apr 28 07:19:10 GMT 2024 - Last Modified: Thu Jul 29 09:26:07 GMT 2021 - 872 bytes - Viewed (0) -
tests/test_tutorial/test_websockets/test_tutorial003_py39.py
def test_websocket_handle_disconnection(client: TestClient): with client.websocket_connect("/ws/1234") as connection, client.websocket_connect( "/ws/5678" ) as connection_two: connection.send_text("Hello from 1234") data1 = connection.receive_text() assert data1 == "You wrote: Hello from 1234" data2 = connection_two.receive_text() client1_says = "Client #1234 says: Hello from 1234"
Python - Registered: Sun Apr 28 07:19:10 GMT 2024 - Last Modified: Sat Mar 18 12:29:59 GMT 2023 - 1.3K bytes - Viewed (0) -
fastapi/dependencies/models.py
self.security_requirements = security_schemes or [] self.request_param_name = request_param_name self.websocket_param_name = websocket_param_name self.http_connection_param_name = http_connection_param_name self.response_param_name = response_param_name self.background_tasks_param_name = background_tasks_param_name self.security_scopes = security_scopes
Python - Registered: Sun Apr 28 07:19:10 GMT 2024 - Last Modified: Fri Jul 07 17:12:13 GMT 2023 - 2.4K bytes - Viewed (0) -
tests/test_ws_router.py
assert data == "path/to/file" data = websocket.receive_text() assert data == "a_query_param" def test_wrong_uri(): """ Verify that a websocket connection to a non-existent endpoing returns in a shutdown """ client = TestClient(app) with pytest.raises(WebSocketDisconnect) as e: with client.websocket_connect("/no-router/"):
Python - Registered: Sun Apr 28 07:19:10 GMT 2024 - Last Modified: Sun Jun 11 19:08:14 GMT 2023 - 7.5K bytes - Viewed (0) -
docs_src/websockets/tutorial003.py
async def broadcast(self, message: str): for connection in self.active_connections: await connection.send_text(message) manager = ConnectionManager() @app.get("/") async def get(): return HTMLResponse(html) @app.websocket("/ws/{client_id}") async def websocket_endpoint(websocket: WebSocket, client_id: int): await manager.connect(websocket)
Python - Registered: Sun Apr 28 07:19:10 GMT 2024 - Last Modified: Sun Aug 09 13:52:19 GMT 2020 - 2.5K bytes - Viewed (0) -
fastapi/dependencies/utils.py
required_params=dependant.body_params, received_body=body ) values.update(body_values) errors.extend(body_errors) if dependant.http_connection_param_name: values[dependant.http_connection_param_name] = request if dependant.request_param_name and isinstance(request, Request): values[dependant.request_param_name] = request
Python - Registered: Sun Apr 28 07:19:10 GMT 2024 - Last Modified: Tue Apr 02 02:52:56 GMT 2024 - 29.5K bytes - Viewed (0) -
docs_src/websockets/tutorial003_py39.py
async def broadcast(self, message: str): for connection in self.active_connections: await connection.send_text(message) manager = ConnectionManager() @app.get("/") async def get(): return HTMLResponse(html) @app.websocket("/ws/{client_id}") async def websocket_endpoint(websocket: WebSocket, client_id: int): await manager.connect(websocket)
Python - Registered: Sun Apr 28 07:19:10 GMT 2024 - Last Modified: Sat Mar 18 12:29:59 GMT 2023 - 2.5K bytes - Viewed (0) -
fastapi/concurrency.py
) -> AsyncGenerator[_T, None]: # blocking __exit__ from running waiting on a free thread # can create race conditions/deadlocks if the context manager itself # has its own internal pool (e.g. a database connection pool) # to avoid this we let __exit__ run without a capacity limit # since we're creating a new limiter for each call, any non-zero limit # works (1 is arbitrary) exit_limiter = CapacityLimiter(1) try:
Python - Registered: Sun Apr 28 07:19:10 GMT 2024 - Last Modified: Mon Dec 25 17:57:35 GMT 2023 - 1.4K bytes - Viewed (0) -
tensorflow/api_template.__init__.py
# See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """ Top-level module of TensorFlow. By convention, we refer to this module as `tf` instead of `tensorflow`, following the common practice of importing TensorFlow via the command `import tensorflow as tf`.
Python - Registered: Tue Apr 30 12:39:09 GMT 2024 - Last Modified: Tue Mar 05 06:27:59 GMT 2024 - 6.7K bytes - Viewed (3)