ai-legal-assistant/backend/app/services/signature_service.py
root ae72b180e5 fix: improve error handling and validation
- Add centralized exception classes
- Validate signature request status before signing
- Check contract status before approval/rejection
- Add exception handlers to FastAPI app
- Update tests for new validation logic

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-01 03:38:20 +08:00

199 lines
5.9 KiB
Python

"""Signature service for electronic signatures."""
import hashlib
import secrets
from datetime import datetime, timedelta
from typing import Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.core.exceptions import (
NotFoundError,
SignatureExpiredError,
SignatureAlreadySignedError,
)
from app.models.signature import (
SignatureRequest,
Signature,
SignatureAudit,
SignatureStatus,
)
from app.models.contract import Contract
class SignatureService:
"""Service for electronic signature operations."""
def __init__(self, db: AsyncSession):
self.db = db
def generate_token(self) -> str:
"""Generate a secure random token."""
return secrets.token_urlsafe(32)
def generate_verification_hash(self, content: str) -> str:
"""Generate a verification hash for document content."""
return hashlib.sha256(content.encode()).hexdigest()
async def create_signature_request(
self,
contract_id: int,
requester_id: int,
signer_name: str,
signer_email: str,
expire_hours: int = None,
) -> SignatureRequest:
"""Create a signature request."""
# Verify contract exists
result = await self.db.execute(
select(Contract).where(Contract.id == contract_id)
)
contract = result.scalar_one_or_none()
if not contract:
raise NotFoundError("Contract", contract_id)
expire_hours = expire_hours or settings.SIGNATURE_TOKEN_EXPIRE_HOURS
request = SignatureRequest(
contract_id=contract_id,
requester_id=requester_id,
signer_name=signer_name,
signer_email=signer_email,
token=self.generate_token(),
expires_at=datetime.utcnow() + timedelta(hours=expire_hours),
)
self.db.add(request)
await self.db.flush()
await self.db.refresh(request)
# Create audit log
await self.create_audit_log(
action="request_created",
request_id=request.id,
details={"signer_email": signer_email}
)
return request
async def get_signature_request_by_token(
self,
token: str
) -> Optional[SignatureRequest]:
"""Get a signature request by token."""
result = await self.db.execute(
select(SignatureRequest).where(SignatureRequest.token == token)
)
return result.scalar_one_or_none()
async def get_signature_request_by_id(
self,
request_id: int
) -> Optional[SignatureRequest]:
"""Get a signature request by ID."""
result = await self.db.execute(
select(SignatureRequest).where(SignatureRequest.id == request_id)
)
return result.scalar_one_or_none()
async def validate_request(self, request: SignatureRequest) -> None:
"""Validate a signature request. Raises exception if invalid."""
if request.status != SignatureStatus.PENDING:
raise SignatureAlreadySignedError()
if request.expires_at < datetime.utcnow():
raise SignatureExpiredError()
async def is_request_valid(self, request: SignatureRequest) -> bool:
"""Check if a signature request is valid."""
try:
await self.validate_request(request)
return True
except Exception:
return False
async def sign_document(
self,
request: SignatureRequest,
signature_data: str,
ip_address: Optional[str] = None,
user_agent: Optional[str] = None,
) -> Signature:
"""Sign a document."""
# Validate request first
await self.validate_request(request)
# Get contract content for hash
result = await self.db.execute(
select(Contract).where(Contract.id == request.contract_id)
)
contract = result.scalar_one_or_none()
if not contract:
raise NotFoundError("Contract", request.contract_id)
verification_hash = self.generate_verification_hash(contract.content)
signature = Signature(
request_id=request.id,
signer_name=request.signer_name,
signature_data=signature_data,
verification_hash=verification_hash,
ip_address=ip_address,
user_agent=user_agent,
)
self.db.add(signature)
# Update request status
request.status = SignatureStatus.SIGNED
# Create audit log
await self.create_audit_log(
action="document_signed",
signature_id=signature.id,
request_id=request.id,
ip_address=ip_address
)
await self.db.flush()
await self.db.refresh(signature)
return signature
async def verify_signature(
self,
signature_id: int,
content: str
) -> bool:
"""Verify a signature against document content."""
result = await self.db.execute(
select(Signature).where(Signature.id == signature_id)
)
signature = result.scalar_one_or_none()
if not signature:
return False
current_hash = self.generate_verification_hash(content)
return current_hash == signature.verification_hash
async def create_audit_log(
self,
action: str,
signature_id: Optional[int] = None,
request_id: Optional[int] = None,
details: Optional[dict] = None,
ip_address: Optional[str] = None,
) -> SignatureAudit:
"""Create an audit log entry."""
audit = SignatureAudit(
action=action,
signature_id=signature_id,
request_id=request_id,
details=details,
ip_address=ip_address,
)
self.db.add(audit)
await self.db.flush()
await self.db.refresh(audit)
return audit