"""Rule executor with RestrictedPython sandbox.""" import signal from typing import Optional, Dict, Any class ExecutionTimeout(Exception): """执行超时异常。""" pass class Sandbox: """RestrictedPython 沙箱(简化版)。""" # 允许的內建函数白名单 ALLOWED_BUILTINS = { "len": len, "str": str, "int": int, "float": float, "bool": bool, "list": list, "dict": dict, "tuple": tuple, "set": set, "range": range, "enumerate": enumerate, "zip": zip, "min": min, "max": max, "abs": abs, "sum": sum, "all": all, "any": any, "sorted": sorted, "reversed": reversed, "isinstance": isinstance, "type": type, "getattr": getattr, "hasattr": hasattr, "True": True, "False": False, "None": None, } # 禁止的操作 BLOCKED_ATTRS = {"__import__", "__builtins__", "__class__", "__bases__", "__subclasses__"} def __init__(self): self._globals: Dict = {"__builtins__": self.ALLOWED_BUILTINS} self._locals: Dict = {} def execute(self, code: str, facts: Dict[str, Any], timeout_ms: int = 100) -> Any: """在沙箱中执行代码。""" # 设置超时 def timeout_handler(signum, frame): raise ExecutionTimeout(f"Execution exceeded {timeout_ms}ms") old_handler = signal.signal(signal.SIGALRM, timeout_handler) try: compiled = compile(code, "", "exec") exec(compiled, self._globals) # 调用 rule 函数(从 globals 中获取) rule_func = self._globals.get("rule") if not callable(rule_func): raise ValueError("No 'rule' function found in code") # 使用 SIGALRM 实现超时 signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(max(1, timeout_ms // 1000)) try: result = rule_func(facts) return result finally: signal.alarm(0) finally: signal.signal(signal.SIGALRM, old_handler) class RuleExecutor: """规则执行器。""" def __init__(self, timeout_ms: int = 100): self.timeout_ms = timeout_ms self.sandbox = Sandbox() def execute_rule(self, code: str, facts: Dict[str, Any]) -> Optional[Dict]: """执行规则代码。""" try: result = self.sandbox.execute(code, facts, self.timeout_ms) return result except ExecutionTimeout: raise RuntimeError(f"Rule execution timed out after {self.timeout_ms}ms") except Exception as e: raise RuntimeError(f"Rule execution failed: {e}") def validate_rule_code(self, code: str) -> bool: """验证规则代码安全性(不执行,只做基础检查)。""" # 检查危险关键字 dangerous = ["import", "exec", "eval", "open", "file", "input", "compile", "__import__"] for keyword in dangerous: if keyword in code: return False # 验证语法 try: compile(code, "", "exec") return True except SyntaxError: return False