How to stream response from HTTP Trigger Azure Function |
Автор: WafaStudies
Загружено: 2025-08-24
Просмотров: 633
Описание:
In this video, I discussed about streaming response back from HTTP trigger type Azure Function using FastAPI extrension for azure functions.
function_app.py file
import azure.functions as func
import logging
import time
from azurefunctions.extensions.http.fastapi import Request, StreamingResponse, JSONResponse
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
def generate_sensor_data():
"""Generate real-time sensor data."""
for i in range(10):
Simulate temperature and humidity readings
temperature = 20 + i
humidity = 50 + i
yield f"data: {{'temperature': {temperature}, 'humidity': {humidity}}}\n\n"
time.sleep(1)
@app.route(route="docStationAzFunCall", methods=[func.HttpMethod.POST, func.HttpMethod.GET])
async def stream_sensor_data(req: Request):
"""Endpoint to stream real-time sensor data.
Validates that the request contains 'input_text' (JSON body or query param).
If missing, returns failure with message 'send input_text'.
"""
input_text = None
Try JSON body first
try:
body = await req.json()
except Exception:
body = None
if isinstance(body, dict):
input_text = body.get("input_text")
Fallback to query params (GET)
if not input_text:
try:
input_text = req.query_params.get("input_text") if hasattr(req, "query_params") else None
except Exception:
input_text = None
If still missing or empty, return failure response
if not input_text:
return JSONResponse({"success": False, "message": "send input_text"}, status_code=400)
logging.info("Received input_text: %s", input_text)
Optionally include the received input_text as the first event in the stream
def gen():
yield f"data: {{'received_input': '{input_text}'}}\n\n"
for i in range(10):
temperature = 20 + i
humidity = 50 + i
yield f"data: {{'temperature': {temperature}, 'humidity': {humidity}}}\n\n"
time.sleep(1)
return StreamingResponse(gen(), media_type="text/event-stream")
Client side call for same:
import httpx
import asyncio
async def consume_stream():
url = "http://localhost:7071/api/docStationAzFunCall?input_text=product=123;start=2025-01-01;end=2025-01-31" # Update if hosted elsewhere
async with httpx.AsyncClient(timeout=None) as client:
async with client.stream("GET", url) as response:
async for line in response.aiter_lines():
if line:
print(f"Received: {line}")
if _name_ == "__main__":
asyncio.run(consume_stream())
#azure #azurefunctions #microsoft #programming #azfun #learning #development
Повторяем попытку...
Доступные форматы для скачивания:
Скачать видео
-
Информация по загрузке: