- 添加 README.md(功能说明、快速开始、API 文档) - 修复冲突检测字段提取正则表达式 - 添加集成测试 tests/integration/test_full_flow.py Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
129 lines
4.9 KiB
Python
129 lines
4.9 KiB
Python
"""Conflict detector for rule conflict detection."""
|
||
from typing import List, Tuple, Optional, Dict, Any
|
||
|
||
|
||
class ConflictDetector:
|
||
"""规则冲突检测器(简化版)。"""
|
||
|
||
def detect_conflicts(self, rules: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||
"""检测规则列表中的冲突。
|
||
|
||
简化版策略:
|
||
1. 检查条件完全相反的规则(如 A->通过, A->拒绝)
|
||
2. 检查包含关系(一条规则条件是另一条的子集)
|
||
|
||
Args:
|
||
rules: 规则列表,每条包含 code 和 priority
|
||
|
||
Returns:
|
||
冲突列表,每项包含冲突规则对和原因
|
||
"""
|
||
conflicts = []
|
||
active_rules = [r for r in rules if r.get("is_active", True)]
|
||
|
||
for i, rule_a in enumerate(active_rules):
|
||
for rule_b in active_rules[i + 1:]:
|
||
conflict = self._check_pair_conflict(rule_a, rule_b)
|
||
if conflict:
|
||
conflicts.append(conflict)
|
||
|
||
return conflicts
|
||
|
||
def _check_pair_conflict(self, rule_a: Dict, rule_b: Dict) -> Optional[Dict]:
|
||
"""检查一对规则是否冲突。"""
|
||
if rule_a.get("priority") == rule_b.get("priority"):
|
||
code_a = rule_a.get("code", "")
|
||
code_b = rule_b.get("code", "")
|
||
|
||
# 检查条件是否重叠(简化:提取 facts.get 调用)
|
||
if not self._conditions_may_overlap(code_a, code_b):
|
||
return None
|
||
|
||
# 提取返回值模式
|
||
returns_a = self._extract_returns(code_a)
|
||
returns_b = self._extract_returns(code_b)
|
||
|
||
if self._has_opposite_actions(returns_a, returns_b):
|
||
return {
|
||
"rule_a_id": rule_a.get("id"),
|
||
"rule_b_id": rule_b.get("id"),
|
||
"rule_a_name": rule_a.get("name"),
|
||
"rule_b_name": rule_b.get("name"),
|
||
"reason": "Opposite actions with overlapping conditions",
|
||
"requires_resolution": True
|
||
}
|
||
|
||
return None
|
||
|
||
def _conditions_may_overlap(self, code_a: str, code_b: str) -> bool:
|
||
"""检查两条规则的条件可能重叠。"""
|
||
import re
|
||
# 提取 .get() 调用的字段(支持 f.get 和 facts.get)
|
||
pattern = r'\.\s*get\s*\(["\']([^"\']+)["\']\)'
|
||
fields_a = set(re.findall(pattern, code_a))
|
||
fields_b = set(re.findall(pattern, code_b))
|
||
|
||
# 如果没有字段信息,假设可能重叠
|
||
if not fields_a and not fields_b:
|
||
return True
|
||
|
||
# 如果任一规则没有字段,假设可能重叠
|
||
if not fields_a or not fields_b:
|
||
return True
|
||
|
||
# 如果共享字段,可能重叠
|
||
return bool(fields_a & fields_b)
|
||
|
||
def _extract_returns(self, code: str) -> List[str]:
|
||
"""从代码中提取返回值常量。"""
|
||
import re
|
||
# 匹配 return {...} 或 return None
|
||
pattern = r"return\s*(?:{[^}]+}|None|\[[^\]]+\]|'[^']+'|\"[^\"]+\")"
|
||
matches = re.findall(pattern, code)
|
||
return matches
|
||
|
||
def _has_opposite_actions(self, returns_a: List[str], returns_b: List[str]) -> bool:
|
||
"""判断两组返回值是否相反。
|
||
|
||
冲突条件:两条规则在相同条件满足时返回相反的动作。
|
||
即:A 返回正面动作,B 返回负面动作(排除 None)。
|
||
"""
|
||
# 提取非 None 的返回值
|
||
positives = {"allow", "approve", "ok", "true", "accept"}
|
||
negatives = {"deny", "reject", "fail", "false", "block"}
|
||
|
||
# 提取实际动作(非 None)
|
||
actions_a = set()
|
||
actions_b = set()
|
||
|
||
for ret in returns_a:
|
||
ret_lower = ret.lower()
|
||
for p in positives:
|
||
if p in ret_lower and "none" not in ret_lower:
|
||
actions_a.add(p)
|
||
for n in negatives:
|
||
if n in ret_lower and "none" not in ret_lower:
|
||
actions_a.add(n)
|
||
|
||
for ret in returns_b:
|
||
ret_lower = ret.lower()
|
||
for p in positives:
|
||
if p in ret_lower and "none" not in ret_lower:
|
||
actions_b.add(p)
|
||
for n in negatives:
|
||
if n in ret_lower and "none" not in ret_lower:
|
||
actions_b.add(n)
|
||
|
||
# 如果有重叠的动作方向相反,则冲突
|
||
a_positive = bool(actions_a & positives)
|
||
a_negative = bool(actions_a & negatives)
|
||
b_positive = bool(actions_b & positives)
|
||
b_negative = bool(actions_b & negatives)
|
||
|
||
# 冲突:A 是正面动作,B 是负面动作(或者反过来)
|
||
return (a_positive and b_negative) or (a_negative and b_positive)
|
||
|
||
def check_rule_with_existing(self, new_rule: Dict, existing_rules: List[Dict]) -> List[Dict]:
|
||
"""检查新规则与现有规则是否冲突。"""
|
||
return self.detect_conflicts(existing_rules + [new_rule])
|