"""Signature service for electronic signatures.""" import hashlib import secrets from datetime import datetime, timedelta from typing import Optional from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import settings from app.core.exceptions import ( NotFoundError, SignatureExpiredError, SignatureAlreadySignedError, ) from app.models.signature import ( SignatureRequest, Signature, SignatureAudit, SignatureStatus, ) from app.models.contract import Contract class SignatureService: """Service for electronic signature operations.""" def __init__(self, db: AsyncSession): self.db = db def generate_token(self) -> str: """Generate a secure random token.""" return secrets.token_urlsafe(32) def generate_verification_hash(self, content: str) -> str: """Generate a verification hash for document content.""" return hashlib.sha256(content.encode()).hexdigest() async def create_signature_request( self, contract_id: int, requester_id: int, signer_name: str, signer_email: str, expire_hours: int = None, ) -> SignatureRequest: """Create a signature request.""" # Verify contract exists result = await self.db.execute( select(Contract).where(Contract.id == contract_id) ) contract = result.scalar_one_or_none() if not contract: raise NotFoundError("Contract", contract_id) expire_hours = expire_hours or settings.SIGNATURE_TOKEN_EXPIRE_HOURS request = SignatureRequest( contract_id=contract_id, requester_id=requester_id, signer_name=signer_name, signer_email=signer_email, token=self.generate_token(), expires_at=datetime.utcnow() + timedelta(hours=expire_hours), ) self.db.add(request) await self.db.flush() await self.db.refresh(request) # Create audit log await self.create_audit_log( action="request_created", request_id=request.id, details={"signer_email": signer_email} ) return request async def get_signature_request_by_token( self, token: str ) -> Optional[SignatureRequest]: """Get a signature request by token.""" result = await self.db.execute( select(SignatureRequest).where(SignatureRequest.token == token) ) return result.scalar_one_or_none() async def get_signature_request_by_id( self, request_id: int ) -> Optional[SignatureRequest]: """Get a signature request by ID.""" result = await self.db.execute( select(SignatureRequest).where(SignatureRequest.id == request_id) ) return result.scalar_one_or_none() async def validate_request(self, request: SignatureRequest) -> None: """Validate a signature request. Raises exception if invalid.""" if request.status != SignatureStatus.PENDING: raise SignatureAlreadySignedError() if request.expires_at < datetime.utcnow(): raise SignatureExpiredError() async def is_request_valid(self, request: SignatureRequest) -> bool: """Check if a signature request is valid.""" try: await self.validate_request(request) return True except Exception: return False async def sign_document( self, request: SignatureRequest, signature_data: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None, ) -> Signature: """Sign a document.""" # Validate request first await self.validate_request(request) # Get contract content for hash result = await self.db.execute( select(Contract).where(Contract.id == request.contract_id) ) contract = result.scalar_one_or_none() if not contract: raise NotFoundError("Contract", request.contract_id) verification_hash = self.generate_verification_hash(contract.content) signature = Signature( request_id=request.id, signer_name=request.signer_name, signature_data=signature_data, verification_hash=verification_hash, ip_address=ip_address, user_agent=user_agent, ) self.db.add(signature) # Update request status request.status = SignatureStatus.SIGNED # Create audit log await self.create_audit_log( action="document_signed", signature_id=signature.id, request_id=request.id, ip_address=ip_address ) await self.db.flush() await self.db.refresh(signature) return signature async def verify_signature( self, signature_id: int, content: str ) -> bool: """Verify a signature against document content.""" result = await self.db.execute( select(Signature).where(Signature.id == signature_id) ) signature = result.scalar_one_or_none() if not signature: return False current_hash = self.generate_verification_hash(content) return current_hash == signature.verification_hash async def create_audit_log( self, action: str, signature_id: Optional[int] = None, request_id: Optional[int] = None, details: Optional[dict] = None, ip_address: Optional[str] = None, ) -> SignatureAudit: """Create an audit log entry.""" audit = SignatureAudit( action=action, signature_id=signature_id, request_id=request_id, details=details, ip_address=ip_address, ) self.db.add(audit) await self.db.flush() await self.db.refresh(audit) return audit