root 1d7946d98a feat(backend): B3 Task 3.2 - SSE 流式响应实现
- 添加 stream_chat 生成器函数处理 SSE 事件流
- 实现 message_start / token / message_end 事件格式
- 添加 messages/stream SSE 端点
- 构建 LLM 消息列表(system prompt + 历史 + 用户消息)
- 持久化用户和 assistant 消息到数据库
- 添加 SSE 测试用 mock Provider

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-07 02:33:12 +08:00

83 lines
3.0 KiB
Python

from fastapi import APIRouter, Depends, status
from fastapi.responses import StreamingResponse
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_db
from app.schemas.conversation import (
ConversationCreate,
ConversationResponse,
ConversationUpdate,
MessageCreate,
MessageResponse,
)
from app.services import conversation_service
router = APIRouter(prefix="/conversations", tags=["conversations"])
@router.post("", response_model=ConversationResponse, status_code=status.HTTP_201_CREATED)
async def create_conversation(
data: ConversationCreate, session: AsyncSession = Depends(get_db)
) -> ConversationResponse:
conv = await conversation_service.create_conversation(
session, data.tenant_id, data.employee_id, data.user_id, data.title
)
return ConversationResponse.from_model(conv)
@router.get("", response_model=list[ConversationResponse])
async def list_conversations(
tenant_id: str, session: AsyncSession = Depends(get_db)
) -> list[ConversationResponse]:
convs = await conversation_service.list_conversations(session, tenant_id)
return [ConversationResponse.from_model(c) for c in convs]
@router.get("/{conversation_id}", response_model=ConversationResponse)
async def get_conversation(
conversation_id: str, session: AsyncSession = Depends(get_db)
) -> ConversationResponse:
conv = await conversation_service.get_conversation(session, conversation_id)
return ConversationResponse.from_model(conv)
@router.delete("/{conversation_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_conversation(
conversation_id: str, session: AsyncSession = Depends(get_db)
) -> None:
await conversation_service.delete_conversation(session, conversation_id)
@router.post("/{conversation_id}/messages", response_model=MessageResponse)
async def send_message(
conversation_id: str,
data: MessageCreate,
session: AsyncSession = Depends(get_db),
) -> MessageResponse:
await conversation_service.get_conversation(session, conversation_id)
msg = await conversation_service.create_message(
session, conversation_id, role="user", content=data.content
)
return MessageResponse.from_model(msg)
@router.post("/{conversation_id}/messages/stream")
async def send_message_stream(
conversation_id: str,
data: MessageCreate,
session: AsyncSession = Depends(get_db),
) -> StreamingResponse:
"""SSE 流式响应端点"""
return StreamingResponse(
conversation_service.stream_chat(session, conversation_id, data.content),
media_type="text/event-stream",
)
@router.get("/{conversation_id}/messages", response_model=list[MessageResponse])
async def list_messages(
conversation_id: str, session: AsyncSession = Depends(get_db)
) -> list[MessageResponse]:
await conversation_service.get_conversation(session, conversation_id)
messages = await conversation_service.list_messages(session, conversation_id)
return [MessageResponse.from_model(m) for m in messages]