ai-legal-assistant/backend/app/services/signature_service.py
root 656f596d7e feat: implement AI legal assistant system MVP
Core modules:
- Laws: CRUD, search, AI-powered QA
- Analysis: legal research and case management
- Contracts: lifecycle management with templates
- Signatures: electronic signature workflow

Infrastructure:
- FastAPI + SQLite + async SQLAlchemy
- Docker deployment support
- 54 unit tests passing

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-01 03:34:44 +08:00

175 lines
5.2 KiB
Python

"""Signature service for electronic signatures."""
import hashlib
import secrets
from datetime import datetime, timedelta
from typing import Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.models.signature import (
SignatureRequest,
Signature,
SignatureAudit,
SignatureStatus,
)
from app.models.contract import Contract
class SignatureService:
"""Service for electronic signature operations."""
def __init__(self, db: AsyncSession):
self.db = db
def generate_token(self) -> str:
"""Generate a secure random token."""
return secrets.token_urlsafe(32)
def generate_verification_hash(self, content: str) -> str:
"""Generate a verification hash for document content."""
return hashlib.sha256(content.encode()).hexdigest()
async def create_signature_request(
self,
contract_id: int,
requester_id: int,
signer_name: str,
signer_email: str,
expire_hours: int = None,
) -> SignatureRequest:
"""Create a signature request."""
expire_hours = expire_hours or settings.SIGNATURE_TOKEN_EXPIRE_HOURS
request = SignatureRequest(
contract_id=contract_id,
requester_id=requester_id,
signer_name=signer_name,
signer_email=signer_email,
token=self.generate_token(),
expires_at=datetime.utcnow() + timedelta(hours=expire_hours),
)
self.db.add(request)
# Create audit log
await self.create_audit_log(
action="request_created",
request_id=request.id,
details={"signer_email": signer_email}
)
await self.db.flush()
await self.db.refresh(request)
return request
async def get_signature_request_by_token(
self,
token: str
) -> Optional[SignatureRequest]:
"""Get a signature request by token."""
result = await self.db.execute(
select(SignatureRequest).where(SignatureRequest.token == token)
)
return result.scalar_one_or_none()
async def get_signature_request_by_id(
self,
request_id: int
) -> Optional[SignatureRequest]:
"""Get a signature request by ID."""
result = await self.db.execute(
select(SignatureRequest).where(SignatureRequest.id == request_id)
)
return result.scalar_one_or_none()
async def is_request_valid(self, request: SignatureRequest) -> bool:
"""Check if a signature request is valid."""
if request.status != SignatureStatus.PENDING:
return False
if request.expires_at < datetime.utcnow():
return False
return True
async def sign_document(
self,
request: SignatureRequest,
signature_data: str,
ip_address: Optional[str] = None,
user_agent: Optional[str] = None,
) -> Signature:
"""Sign a document."""
# Get contract content for hash
result = await self.db.execute(
select(Contract).where(Contract.id == request.contract_id)
)
contract = result.scalar_one_or_none()
if not contract:
raise ValueError("Contract not found")
verification_hash = self.generate_verification_hash(contract.content)
signature = Signature(
request_id=request.id,
signer_name=request.signer_name,
signature_data=signature_data,
verification_hash=verification_hash,
ip_address=ip_address,
user_agent=user_agent,
)
self.db.add(signature)
# Update request status
request.status = SignatureStatus.SIGNED
# Create audit log
await self.create_audit_log(
action="document_signed",
signature_id=signature.id,
request_id=request.id,
ip_address=ip_address
)
await self.db.flush()
await self.db.refresh(signature)
return signature
async def verify_signature(
self,
signature_id: int,
content: str
) -> bool:
"""Verify a signature against document content."""
result = await self.db.execute(
select(Signature).where(Signature.id == signature_id)
)
signature = result.scalar_one_or_none()
if not signature:
return False
current_hash = self.generate_verification_hash(content)
return current_hash == signature.verification_hash
async def create_audit_log(
self,
action: str,
signature_id: Optional[int] = None,
request_id: Optional[int] = None,
details: Optional[dict] = None,
ip_address: Optional[str] = None,
) -> SignatureAudit:
"""Create an audit log entry."""
audit = SignatureAudit(
action=action,
signature_id=signature_id,
request_id=request_id,
details=details,
ip_address=ip_address,
)
self.db.add(audit)
await self.db.flush()
await self.db.refresh(audit)
return audit