批次1: 项目结构 + SQLite 存储层 + 数据模型 批次2: REST API (http.server) 批次3: LLM 编译器 (支持 OpenAI/Anthropic) 批次4: RestrictedPython 规则执行器 批次5: 规则匹配器 + LLM Callback 兜底 批次6: 冲突检测器 Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
74 lines
3.4 KiB
Python
74 lines
3.4 KiB
Python
"""Tests for conflict detector."""
|
|
import pytest
|
|
from rule_engine.conflict import ConflictDetector
|
|
|
|
|
|
@pytest.fixture
|
|
def detector():
|
|
return ConflictDetector()
|
|
|
|
|
|
def test_no_conflict_different_actions(detector):
|
|
"""验证不同动作的规则不冲突。"""
|
|
rules = [
|
|
{"id": "r1", "name": "rule1", "code": 'def rule(f):\n if f.get("a") == 1:\n return {"action": "allow"}\n return None', "priority": 1, "is_active": True},
|
|
{"id": "r2", "name": "rule2", "code": 'def rule(f):\n if f.get("b") == 1:\n return {"action": "discount"}\n return None', "priority": 1, "is_active": True},
|
|
]
|
|
conflicts = detector.detect_conflicts(rules)
|
|
assert len(conflicts) == 0
|
|
|
|
|
|
def test_conflict_opposite_actions_same_priority(detector):
|
|
"""验证优先级相同的相反动作规则冲突。"""
|
|
rules = [
|
|
{"id": "r1", "name": "allow_rule", "code": 'def rule(f):\n if f.get("type") == "vip":\n return {"action": "allow"}\n return None', "priority": 1, "is_active": True},
|
|
{"id": "r2", "name": "deny_rule", "code": 'def rule(f):\n if f.get("type") == "vip":\n return {"action": "deny"}\n return None', "priority": 1, "is_active": True},
|
|
]
|
|
conflicts = detector.detect_conflicts(rules)
|
|
assert len(conflicts) == 1
|
|
assert conflicts[0]["requires_resolution"] is True
|
|
|
|
|
|
def test_no_conflict_different_priority(detector):
|
|
"""验证优先级不同的规则不冲突(高优先级覆盖低优先级)。"""
|
|
rules = [
|
|
{"id": "r1", "name": "allow_rule", "code": 'def rule(f):\n if f.get("type") == "vip":\n return {"action": "allow"}\n return None', "priority": 2, "is_active": True},
|
|
{"id": "r2", "name": "deny_rule", "code": 'def rule(f):\n if f.get("type") == "vip":\n return {"action": "deny"}\n return None', "priority": 1, "is_active": True},
|
|
]
|
|
conflicts = detector.detect_conflicts(rules)
|
|
assert len(conflicts) == 0
|
|
|
|
|
|
def test_conflict_inactive_rules_ignored(detector):
|
|
"""验证非活跃规则不参与冲突检测。"""
|
|
rules = [
|
|
{"id": "r1", "name": "allow_rule", "code": 'def rule(f):\n return {"action": "allow"}', "priority": 1, "is_active": True},
|
|
{"id": "r2", "name": "deny_rule", "code": 'def rule(f):\n return {"action": "deny"}', "priority": 1, "is_active": False},
|
|
]
|
|
conflicts = detector.detect_conflicts(rules)
|
|
assert len(conflicts) == 0
|
|
|
|
|
|
def test_check_rule_with_existing(detector):
|
|
"""验证新规则与现有规则检查。"""
|
|
existing = [
|
|
{"id": "r1", "name": "allow", "code": 'def rule(f):\n return {"action": "allow"}', "priority": 1, "is_active": True},
|
|
]
|
|
new_rule = {"id": "r2", "name": "deny", "code": 'def rule(f):\n return {"action": "deny"}', "priority": 1, "is_active": True}
|
|
conflicts = detector.check_rule_with_existing(new_rule, existing)
|
|
assert len(conflicts) == 1
|
|
|
|
|
|
def test_extract_returns(detector):
|
|
"""验证返回值提取。"""
|
|
code = 'def rule(f):\n if f.get("x"):\n return {"action": "allow"}\n return None'
|
|
returns = detector._extract_returns(code)
|
|
assert len(returns) == 2
|
|
|
|
|
|
def test_has_opposite_actions(detector):
|
|
"""验证相反动作检测。"""
|
|
assert detector._has_opposite_actions(['"allow"', '"ok"'], ['"deny"']) is True
|
|
assert detector._has_opposite_actions(['"allow"'], ['"allow"']) is False
|
|
assert detector._has_opposite_actions(['"discount"'], ['"None"']) is False
|