warbler-cda / tests /test_conflict_detector.py
Bellok's picture
Upload folder using huggingface_hub
0ccf2f0 verified
"""Test suite for conflict detector.
Tests the ConflictDetector class for detecting semantic conflicts and contradictions.
"""
import time
from typing import List
import pytest
from warbler_cda.conflict_detector import (
ConflictDetector,
ConflictType,
ConflictEvidence,
StatementFingerprint
)
# Mock embedding provider for testing
class MockEmbeddingProvider:
"""Mock embedding provider for testing."""
def __init__(self):
self.embed_text_calls = []
self.calculate_similarity_calls = []
def embed_text(self, text: str, *args, **kwargs) -> List[float]:
"""Mock text embedding."""
self.embed_text_calls.append(text)
# Return a simple mock embedding based on text length
return [len(text) / 100.0] * 384
def calculate_similarity(self, emb1: List[float], emb2: List[float]) -> float:
"""Mock similarity calculation."""
self.calculate_similarity_calls.append((emb1, emb2))
# Return similarity based on embedding values (0.0 to 1.0)
# For testing semantic conflicts, return high similarity for texts of same length
if abs(emb1[0] - emb2[0]) < 0.1: # Similar length texts
return 0.95 # Very similar
return 0.3 # Different
class TestConflictDetector:
"""Test the ConflictDetector class."""
def setup_method(self):
"""Setup before each test."""
self.mock_provider = MockEmbeddingProvider() # pylint: disable=W0201
self.detector = ConflictDetector(embedding_provider=self.mock_provider) # pylint: disable=W0201
def test_initialization_default_config(self):
"""Test ConflictDetector initialization with default config."""
detector = ConflictDetector()
assert detector.config == {}
assert detector.opposition_threshold == 0.7
assert detector.semantic_similarity_threshold == 0.8
assert detector.min_confidence_score == 0.6
assert detector.max_statement_age_hours == 24
assert len(detector.statement_fingerprints) == 0
assert len(detector.detected_conflicts) == 0
assert len(detector.conflict_history) == 0
def test_initialization_custom_config(self):
"""Test ConflictDetector initialization with custom config."""
config = {
"opposition_threshold": 0.8,
"semantic_similarity_threshold": 0.9,
"min_confidence_score": 0.7,
"max_statement_age_hours": 48
}
detector = ConflictDetector(config=config)
assert detector.opposition_threshold == 0.8
assert detector.semantic_similarity_threshold == 0.9
assert detector.min_confidence_score == 0.7
assert detector.max_statement_age_hours == 48
def test_process_statements_empty_list(self):
"""Test processing empty statement list."""
result = self.detector.process_statements([])
assert result["statements_processed"] == 0
assert result["fingerprints_created"] == 0
assert result["new_conflicts"] == [] # pylint: disable=C1803
assert result["total_active_statements"] == 0
assert result["total_conflicts_detected"] == 0
def test_process_statements_single_statement(self):
"""Test processing a single statement."""
statements = [{"id": "stmt_1", "text": "This is a test statement about memory storage"}]
result = self.detector.process_statements(statements)
assert result["statements_processed"] == 1
assert result["fingerprints_created"] == 1
assert len(self.detector.statement_fingerprints) == 1
assert "stmt_1" in self.detector.statement_fingerprints
fingerprint = self.detector.statement_fingerprints["stmt_1"]
assert fingerprint.statement_id == "stmt_1"
assert fingerprint.content == "This is a test statement about memory storage"
assert fingerprint.domain_tags == {"memory"}
assert "memory" in fingerprint.domain_tags
def test_process_statements_without_ids(self):
"""Test processing statements without IDs."""
statements = [{"text": "First statement"}]
result = self.detector.process_statements(statements)
assert result["statements_processed"] == 1
assert result["fingerprints_created"] == 1
assert len(self.detector.statement_fingerprints) == 1
# Check that ID was generated
stmt_id = list(self.detector.statement_fingerprints.keys())[0]
assert stmt_id.startswith("stmt_")
def test_process_statements_empty_content(self):
"""Test processing statements with empty content."""
statements = [
{"id": "stmt_1", "text": ""},
{"id": "stmt_2", "text": " "},
{"id": "stmt_3", "text": "Valid content"}
]
result = self.detector.process_statements(statements)
# Only the valid statement should be processed
assert result["statements_processed"] == 3
assert result["fingerprints_created"] == 1
assert len(self.detector.statement_fingerprints) == 1
assert "stmt_3" in self.detector.statement_fingerprints
def test_semantic_opposition_detection(self):
"""Test detection of semantic opposition with negation."""
# Use custom config with lower thresholds to enable conflict detection with mock
config = {
"opposition_threshold": 0.3, # Lower than default 0.7 to trigger with negation diff
"semantic_similarity_threshold": 0.8,
"min_confidence_score": 0.6
}
detector = ConflictDetector(config=config, embedding_provider=self.mock_provider)
# Add first statement without negation
statements1 = [{"id": "stmt_1", "text": "This algorithm is correct and efficient"}]
detector.process_statements(statements1)
# Add opposing statement with negation
statements2 = [{"id": "stmt_2", "text": "This algorithm is not correct and efficient"}]
result = detector.process_statements(statements2)
# Verify conflict detection works
assert len(result["new_conflicts"]) == 1
conflict = result["new_conflicts"][0]
assert conflict["statement_a"] == "stmt_2"
assert conflict["statement_b"] == "stmt_1"
assert conflict["conflict_type"] == "semantic_opposition"
assert conflict["confidence_score"] >= 0.6
assert "not" in conflict["opposition_indicators"]
# Verify that fingerprint creation worked and has proper negation detection
assert "stmt_1" in detector.statement_fingerprints
assert "stmt_2" in detector.statement_fingerprints
fp1 = detector.statement_fingerprints["stmt_1"]
fp2 = detector.statement_fingerprints["stmt_2"]
assert len(fp1.negation_indicators) == 0 # No negation in first statement
assert "not" in fp2.negation_indicators # "not" found in second statement
# Verify mock was called
assert len(self.mock_provider.calculate_similarity_calls) > 0
# Verify conflict is stored in detector
assert len(detector.detected_conflicts) == 1
stored_conflict = detector.detected_conflicts[0]
assert stored_conflict.conflict_type == ConflictType.SEMANTIC_OPPOSITION
assert stored_conflict.confidence_score >= 0.6
def test_conflict_evidence_creation(self):
"""Test that conflict evidence is properly created."""
# Setup test data
conflict = ConflictEvidence(
statement_a_id="stmt_a",
statement_b_id="stmt_b",
conflict_type=ConflictType.SEMANTIC_OPPOSITION,
confidence_score=0.85,
semantic_distance=0.15,
opposition_indicators=["not"],
context_overlap=0.7,
detection_timestamp=time.time()
)
assert conflict.conflict_type == ConflictType.SEMANTIC_OPPOSITION
assert conflict.confidence_score == 0.85
assert conflict.opposition_indicators == ["not"]
assert conflict.get_age_seconds() >= 0
def test_temporal_conflict_detection(self):
"""Test detection of temporal conflicts."""
# Add statements with temporal markers
statements1 = [{"id": "stmt_1", "text": "The algorithm will finish before tomorrow"}]
self.detector.process_statements(statements1)
statements2 = [{"id": "stmt_2", "text": "The algorithm will finish after later today"}]
result = self.detector.process_statements(statements2)
# Should detect temporal conflict
assert len(result["new_conflicts"]) >= 0 # May not always trigger due
# to similarity requirements
def test_get_conflict_analysis_no_conflicts(self):
"""Test conflict analysis for statement with no conflicts."""
statements = [{"id": "stmt_1", "text": "This is a simple statement"}]
self.detector.process_statements(statements)
analysis = self.detector.get_conflict_analysis("stmt_1")
assert analysis["conflicts_found"] == 0
assert analysis["status"] == "no_conflicts"
assert "consistent" in analysis["recommendation"]
def test_get_global_conflict_summary(self):
"""Test global conflict summary generation."""
# Start with no conflicts
summary = self.detector.get_global_conflict_summary()
assert summary["total_conflicts"] == 0
assert summary["status"] == "healthy"
assert summary["system_health_score"] == 1.0
# Add some statements and conflicts
statements1 = [{"id": "stmt_1", "text": "This is definitely correct"}]
self.detector.process_statements(statements1)
# Create conflict manually for testing
conflict = ConflictEvidence(
statement_a_id="stmt_1",
statement_b_id="stmt_2",
conflict_type=ConflictType.SEMANTIC_OPPOSITION,
confidence_score=0.95,
semantic_distance=0.05,
opposition_indicators=["not"],
context_overlap=0.5,
detection_timestamp=time.time()
)
self.detector.detected_conflicts.append(conflict)
summary = self.detector.get_global_conflict_summary()
assert summary["total_conflicts"] == 1
assert summary["confidence_distribution"]["high"] == 1
assert summary["status"] == "healthy" # One conflict doesn't trigger warning (>2 needed)
# Actually assert healthy as per the comment above
def test_resolve_conflict_success(self):
"""Test successful conflict resolution."""
# Create a conflict
conflict = ConflictEvidence(
statement_a_id="stmt_1",
statement_b_id="stmt_2",
conflict_type=ConflictType.SEMANTIC_OPPOSITION,
confidence_score=0.8,
semantic_distance=0.2,
opposition_indicators=["not"],
context_overlap=0.5,
detection_timestamp=time.time()
)
self.detector.detected_conflicts.append(conflict)
# Generate conflict ID and resolve
conflict_id = self.detector._generate_conflict_id(conflict) # pylint: disable=W0212
resolved = self.detector.resolve_conflict(conflict_id, "User confirmed resolution")
assert resolved is True
assert len(self.detector.detected_conflicts) == 0
assert len(self.detector.conflict_history) == 1
assert self.detector.metrics["false_positives_resolved"] == 1
def test_resolve_conflict_not_found(self):
"""Test conflict resolution when conflict ID doesn't exist."""
resolved = self.detector.resolve_conflict("nonexistent_id", "Test resolution")
assert resolved is False
def test_domain_tag_extraction(self):
"""Test that domain tags are correctly extracted from statements."""
# Test memory domain
statements = [{"id": "stmt_1", "text": "The storage memory needs optimization"}]
self.detector.process_statements(statements)
fingerprint = self.detector.statement_fingerprints["stmt_1"]
assert "memory" in fingerprint.domain_tags
# Test development domain
statements2 = [{"id": "stmt_2", "text": "Debug the development process"}]
self.detector.process_statements(statements2)
fingerprint2 = self.detector.statement_fingerprints["stmt_2"]
assert "development" in fingerprint2.domain_tags
def test_assertion_strength_calculation(self):
"""Test that assertion strength is correctly calculated."""
# Statement with multiple assertion words - should reach max strength
statements = [{"id": "stmt_1",
"text": "This is definitely always absolutely certainly and must be guaranteed"}]
self.detector.process_statements(statements)
fingerprint = self.detector.statement_fingerprints["stmt_1"]
# Should have max assertion strength due to multiple indicators
assert fingerprint.assertion_strength == 1.0
# Statement with weak assertions
statements2 = [{"id": "stmt_2", "text": "This might be okay"}]
self.detector.process_statements(statements2)
fingerprint2 = self.detector.statement_fingerprints["stmt_2"]
# Should have lower assertion strength
assert fingerprint2.assertion_strength < 1.0
def test_negation_indicator_detection(self):
"""Test that negation indicators are correctly detected."""
statements = [{"id": "stmt_1", "text": "This is not wrong or incorrect"}]
self.detector.process_statements(statements)
fingerprint = self.detector.statement_fingerprints["stmt_1"]
assert "not" in fingerprint.negation_indicators
assert "incorrect" in fingerprint.negation_indicators
def test_fingerprint_creation_without_embedding_provider(self):
"""Test fingerprint creation when no embedding provider is available."""
detector = ConflictDetector() # No embedding provider
statements = [{"id": "stmt_1", "text": "Test statement without embeddings"}]
result = detector.process_statements(statements)
assert result["fingerprints_created"] == 1
fingerprint = detector.statement_fingerprints["stmt_1"]
assert fingerprint.embedding == [] # Should be empty when no provider
def test_metrics_update(self):
"""Test that metrics are correctly updated during processing."""
initial_statements = self.detector.metrics["statements_processed"]
statements = [{"id": "stmt_1", "text": "First statement"}]
self.detector.process_statements(statements)
assert self.detector.metrics["statements_processed"] == initial_statements + 1
def test_conflict_type_enum_values(self):
"""Test that conflict type enum has correct values."""
assert ConflictType.SEMANTIC_OPPOSITION.value == "semantic_opposition"
assert ConflictType.LOGICAL_CONTRADICTION.value == "logical_contradiction"
assert ConflictType.FACTUAL_INCONSISTENCY.value == "factual_inconsistency"
assert ConflictType.TEMPORAL_CONFLICT.value == "temporal_conflict"
assert ConflictType.SCOPE_MISMATCH.value == "scope_mismatch"
def test_conflict_id_generation(self):
"""Test that conflict IDs are consistently generated."""
conflict = ConflictEvidence(
statement_a_id="stmt_a",
statement_b_id="stmt_b",
conflict_type=ConflictType.SEMANTIC_OPPOSITION,
confidence_score=0.8,
semantic_distance=0.2,
opposition_indicators=[],
context_overlap=0.5,
detection_timestamp=1000000.0
)
id1 = self.detector._generate_conflict_id(conflict) # pylint: disable=W0212
id2 = self.detector._generate_conflict_id(conflict) # pylint: disable=W0212
# Same conflict should generate same ID
assert id1 == id2
assert len(id1) == 12 # MD5 hash truncated to 12 chars
class TestStatementFingerprint:
"""Test the StatementFingerprint dataclass."""
def test_fingerprint_creation(self):
"""Test basic fingerprint creation."""
from typing import Set # pylint: disable=W0201 w0611 C0415 # noqa: F401
fingerprint = StatementFingerprint(
statement_id="test_123",
content="This is test content about semantic processing",
embedding=[0.1, 0.2, 0.3],
negation_indicators=["not"],
assertion_strength=0.8,
temporal_markers=["before"],
domain_tags={"semantics", "processing"},
creation_timestamp=1234567890.0
)
assert fingerprint.statement_id == "test_123"
assert fingerprint.content == "This is test content about semantic processing"
assert fingerprint.embedding == [0.1, 0.2, 0.3]
assert fingerprint.negation_indicators == ["not"]
assert fingerprint.assertion_strength == 0.8
assert fingerprint.temporal_markers == ["before"]
assert "semantics" in fingerprint.domain_tags
assert "processing" in fingerprint.domain_tags
def test_fingerprint_equality(self):
"""Test fingerprint equality comparison."""
fp1 = StatementFingerprint(
statement_id="id1", content="content", embedding=[],
negation_indicators=[], assertion_strength=0.5,
temporal_markers=[], domain_tags=set(), creation_timestamp=1000.0
)
fp2 = StatementFingerprint(
statement_id="id1", content="content", embedding=[],
negation_indicators=[], assertion_strength=0.5,
temporal_markers=[], domain_tags=set(), creation_timestamp=1000.0
)
assert fp1 == fp2
class TestConflictEvidence:
"""Test the ConflictEvidence dataclass."""
def test_conflict_evidence_creation(self):
"""Test basic conflict evidence creation."""
conflict = ConflictEvidence(
statement_a_id="stmt_1",
statement_b_id="stmt_2",
conflict_type=ConflictType.SEMANTIC_OPPOSITION,
confidence_score=0.75,
semantic_distance=0.25,
opposition_indicators=["not", "incorrect"],
context_overlap=0.8,
detection_timestamp=1234567890.0
)
assert conflict.statement_a_id == "stmt_1"
assert conflict.statement_b_id == "stmt_2"
assert conflict.conflict_type == ConflictType.SEMANTIC_OPPOSITION
assert conflict.confidence_score == 0.75
assert conflict.semantic_distance == 0.25
assert conflict.opposition_indicators == ["not", "incorrect"]
assert conflict.context_overlap == 0.8
# Test age calculation (will be small since timestamp is old)
age = conflict.get_age_seconds()
assert age > 0
if __name__ == "__main__":
pytest.main([__file__, "-v"])