| EPISTEMIC THREAT MODEL & VALIDATOR ONTOLOGY |
|
|
| FORMAL THREAT MODEL (STRIDE-E: Epistemic Extension) |
|
|
| Spoofing - Identity Subversion |
|
|
| ``` |
| Threat: n8n workflow impersonates validator |
| Impact: False authority injection into ledger |
| Mitigation: |
| 1. Cryptographic validator identity (PKI-based) |
| 2. Validator role attestation signed by IRE |
| 3. Workflow-to-validator binding in ledger metadata |
|
|
| Detection: Mismatch between workflow signature and validator claim |
| Severity: Critical (sovereignty breach) |
| ``` |
|
|
| Tampering - Evidence Manipulation |
|
|
| ``` |
| Threat: n8n alters evidence pre-canonicalization |
| Impact: Garbage-in, narrative-out |
| Mitigation: |
| 1. Raw evidence fingerprinting (content hash before processing) |
| 2. Evidence lineage tracking in n8n workflow logs |
| 3. IRE detects fingerprint mismatches |
|
|
| Detection: Pre-canonical hash ≠ post-canonical derivation |
| Severity: Critical (truth contamination) |
| ``` |
|
|
| Repudiation - Epistemic Deniability |
|
|
| ``` |
| Threat: n8n denies triggering detection that found suppression |
| Impact: System loses accountability for its own findings |
| Mitigation: |
| 1. Non-repudiable workflow execution proofs |
| 2. Watermarked intermediate results |
| 3. Cross-referenced timestamp chains |
|
|
| Detection: Missing execution proof for detection result |
| Severity: High (accountability loss) |
| ``` |
|
|
| Information Disclosure - Pattern Leakage |
|
|
| ``` |
| Threat: Detection patterns leaked through n8n logs |
| Impact: Adversaries learn system |
| Mitigation: |
| 1. Threshold abstraction (IRE returns categories, not scores) |
| 2. Differential privacy on aggregated results |
| 3. Ephemeral detection sessions |
|
|
| Detection: Raw thresholds appear in orchestration logs |
| Severity: Medium (detection model compromise) |
| ``` |
|
|
| Denial of Service - Epistemic Exhaustion |
|
|
| ``` |
| Threat: Flood system with nonsense to waste detection capacity |
| Impact: Real patterns missed due to noise saturation |
| Mitigation: |
| 1. Epistemic rate limiting per source |
| 2. Credibility-based throttling |
| 3. Detection priority queuing |
|
|
| Detection: High-volume, low-signal detection requests |
| Severity: Medium (resource exhaustion) |
| ``` |
|
|
| Elevation of Privilege - Ontology Hijacking |
|
|
| ``` |
| Threat: n8n attempts to modify lens/method definitions |
| Impact: Epistemic framework compromise |
| Mitigation: |
| 1. Immutable ontology registry (signed by originators) |
| 2. Versioned ontology with migration proofs |
| 3. No runtime ontology modification API |
|
|
| Detection: Attempt to modify lens/method definitions |
| Severity: Critical (sovereignty destruction) |
| ``` |
|
|
| Epistemic Drift - Gradual Corruption |
|
|
| ``` |
| Threat: Slow, subtle contamination of detection patterns |
| Impact: System gradually aligns with preferred narrative |
| Mitigation: |
| 1. Drift detection via historical self-comparison |
| 2. Cross-validation with frozen model versions |
| 3. Epistemic checksums on detection algorithms |
|
|
| Detection: Statistical divergence from historical baseline |
| Severity: High (stealth corruption) |
| ``` |
|
|
| VALIDATOR ONTOLOGY (Role-Based Authority) |
|
|
| Validator Archetypes |
|
|
| ``` |
| 1. Human-Sovereign Validator |
| Role: Individual sovereignty preservation |
| Authority: Can veto any ledger commit |
| Identity: Self-sovereign cryptographic identity |
| Quorum: Always required (cannot be automated away) |
| Attestation: "I have reviewed and assert my sovereignty" |
|
|
| 2. System-Epistemic Validator |
| Role: Detection methodology integrity |
| Authority: Validates detection process adherence |
| Identity: Cryptographic hash of detection pipeline config |
| Quorum: Required for automated commits |
| Attestation: "Detection executed per published methodology" |
|
|
| 3. Source-Provenance Validator |
| Role: Evidence chain custody |
| Authority: Validates evidence hasn |
| Identity: Hash of evidence handling workflow |
| Quorum: Optional but recommended |
| Attestation: "Evidence chain intact from source to canonicalization" |
|
|
| 4. Temporal-Integrity Validator |
| Role: Time-bound execution verification |
| Authority: Validates timestamps and execution windows |
| Identity: Time-locked cryptographic proof |
| Quorum: Required for time-sensitive commits |
| Attestation: "Execution occurred within valid time window" |
|
|
| 5. Community-Plurality Validator |
| Role: Cross-interpreter consensus |
| Authority: Requires multiple human interpretations |
| Identity: Set of interpreter identities + attestation |
| Quorum: Variable based on interpretation count |
| Attestation: "Multiple independent interpretations concur" |
| ``` |
|
|
| Validator Configuration Schema |
|
|
| ```json |
| { |
| "validator_id": "urn:ire:validator:human_sovereign:sha256-abc123", |
| "archetype": "human_sovereign", |
| "authority_scope": ["ledger_commit", "evidence_rejection"], |
| "quorum_requirements": { |
| "minimum": 1, |
| "maximum": null, |
| "exclusivity": ["system_epistemic"] |
| }, |
| "attestation_format": { |
| "required_fields": ["review_timestamp", "sovereignty_assertion"], |
| "signature_algorithm": "ed25519", |
| "expiration": "24h" |
| }, |
| "epistemic_constraints": { |
| "cannot_override": ["detection_results", "canonical_hash"], |
| "can_reject_for": ["procedural_violation", "sovereignty_concern"] |
| } |
| } |
| ``` |
|
|
| Validator Quorum Calculus |
|
|
| ```python |
| def calculate_quorum_satisfaction(validators: List[Validator], commit_type: str) -> bool: |
| """Calculate if validator quorum is satisfied for commit type""" |
| |
| archetype_counts = Counter(v.archetype for v in validators) |
| |
| # Base requirements |
| requirements = { |
| "ledger_commit": { |
| "human_sovereign": 1, |
| "system_epistemic": 1, |
| "source_provenance": 0, # optional |
| "temporal_integrity": 1, |
| "community_plurality": 0 # depends on interpretation count |
| }, |
| "evidence_ingestion": { |
| "human_sovereign": 0, |
| "system_epistemic": 1, |
| "source_provenance": 1, |
| "temporal_integrity": 1, |
| "community_plurality": 0 |
| }, |
| "detection_escalation": { |
| "human_sovereign": 1, |
| "system_epistemic": 1, |
| "source_provenance": 0, |
| "temporal_integrity": 1, |
| "community_plurality": 1 # required for high-stakes escalations |
| } |
| } |
| |
| req = requirements[commit_type] |
| |
| # Check each archetype requirement |
| for archetype, required_count in req.items(): |
| if archetype_counts.get(archetype, 0) < required_count: |
| return False |
| |
| # Check exclusivity constraints |
| for validator in validators: |
| for exclusive_archetype in validator.quorum_requirements.get("exclusivity", []): |
| if exclusive_archetype in archetype_counts: |
| return False |
| |
| return True |
| ``` |
|
|
| LEDGER SCHEMA HARDENING |
|
|
| Extended Block Schema |
|
|
| ```json |
| { |
| "block": { |
| "header": { |
| "id": "blk_timestamp_hash", |
| "prev": "previous_block_hash", |
| "timestamp": "ISO8601_with_nanoseconds", |
| "epistemic_epoch": 1, |
| "ontology_version": "sha256:ontology_hash" |
| }, |
| "body": { |
| "nodes": [RealityNode], |
| "detection_context": { |
| "workflow_hash": "sha256:n8n_workflow_def", |
| "execution_window": { |
| "not_before": "timestamp", |
| "not_after": "timestamp", |
| "time_proof": "signature_from_temporal_validator" |
| }, |
| "threshold_used": "abstract_category_not_numeric" |
| } |
| }, |
| "validations": { |
| "attestations": [ |
| { |
| "validator_id": "urn:ire:validator:...", |
| "archetype": "human_sovereign", |
| "attestation": "cryptographic_signature", |
| "scope": ["ledger_commit"], |
| "expires": "timestamp" |
| } |
| ], |
| "quorum_satisfied": true, |
| "quorum_calc": { |
| "required": {"human_sovereign": 1, "system_epistemic": 1}, |
| "present": {"human_sovereign": 1, "system_epistemic": 1} |
| } |
| }, |
| "integrity_marks": { |
| "evidence_fingerprint": "sha256_of_raw_content", |
| "detection_watermark": "nonce_based_on_workflow_id", |
| "epistemic_checksum": "hash_of_detection_logic_version" |
| } |
| } |
| } |
| ``` |
|
|
| Detection Threshold Abstraction Layer |
|
|
| ```python |
| class EpistemicThresholdInterface: |
| """Abstract threshold interface - n8n never sees numeric thresholds""" |
| |
| def __init__(self, ire_client): |
| self.ire = ire_client |
| |
| def should_escalate(self, detection_results: Dict) -> Dict: |
| """IRE decides escalation, returns abstract category""" |
| response = self.ire.post("/ire/detect/evaluate", { |
| "results": detection_results, |
| "return_format": "abstract_category" |
| }) |
| |
| return { |
| "escalation_recommended": response.get("category") == "high_confidence", |
| "confidence_level": response.get("confidence_label"), # "high"/"medium"/"low" |
| "next_action": response.get("recommended_action"), |
| # NO NUMERIC THRESHOLDS EXPOSED |
| # NO RAW SCORES EXPOSED |
| } |
| |
| def get_validation_requirements(self, category: str) -> Dict: |
| """Map escalation category to validator requirements""" |
| mapping = { |
| "high_confidence": { |
| "human_sovereign": 2, |
| "system_epistemic": 1, |
| "community_plurality": 1 |
| }, |
| "medium_confidence": { |
| "human_sovereign": 1, |
| "system_epistemic": 1 |
| }, |
| "low_confidence": { |
| "system_epistemic": 1 |
| } |
| } |
| return mapping.get(category, {}) |
| ``` |
|
|
| Time-Window Enforcement |
|
|
| ```python |
| class TemporalIntegrityEnforcer: |
| """Enforce time-bound execution with cryptographic proofs""" |
| |
| def create_execution_window(self, duration_hours: int = 24) -> Dict: |
| """Create cryptographically bound execution window""" |
| window_id = f"window_{uuid.uuid4()}" |
| not_before = datetime.utcnow() |
| not_after = not_before + timedelta(hours=duration_hours) |
| |
| # Create time-locked proof |
| window_proof = { |
| "window_id": window_id, |
| "not_before": not_before.isoformat() + "Z", |
| "not_after": not_after.isoformat() + "Z", |
| "issuer": "ire_temporal_validator", |
| "signature": self._sign_temporal_window(window_id, not_before, not_after) |
| } |
| |
| return window_proof |
| |
| def validate_within_window(self, |
| action_timestamp: str, |
| window_proof: Dict) -> bool: |
| """Validate action occurred within execution window""" |
| # Verify signature |
| if not self._verify_signature(window_proof): |
| return False |
| |
| # Parse timestamps |
| action_time = datetime.fromisoformat(action_timestamp.replace( |
| not_before = datetime.fromisoformat(window_proof[ |
| not_after = datetime.fromisoformat(window_proof[ |
| |
| # Check bounds |
| return not_before <= action_time <= not_after |
| |
| def detect_time_anomalies(self, workflow_executions: List[Dict]) -> List[Dict]: |
| """Detect temporal manipulation patterns""" |
| anomalies = [] |
| |
| for i, execution in enumerate(workflow_executions): |
| # Check for reverse time flow |
| if i > 0: |
| prev_time = datetime.fromisoformat( |
| workflow_executions[i-1][ |
| ) |
| curr_time = datetime.fromisoformat( |
| execution[ |
| ) |
| if curr_time < prev_time: |
| anomalies.append({ |
| "type": "reverse_time_flow", |
| "execution_id": execution[ |
| "anomaly": f"Time went backwards: {curr_time} < {prev_time}" |
| }) |
| |
| # Check execution duration anomalies |
| if |
| expected_duration = self._get_expected_duration(execution[ |
| if execution[ |
| anomalies.append({ |
| "type": "suspicious_duration", |
| "execution_id": execution[ |
| "anomaly": f"Duration {execution['duration']} exceeds expected {expected_duration}" |
| }) |
| |
| return anomalies |
| ``` |
|
|
| Semantic Laundering Defense |
|
|
| ```python |
| class EpistemicIntegrityGuard: |
| """Defend against semantic laundering attacks""" |
| |
| def __init__(self): |
| self.similarity_clusters = defaultdict(list) |
| self.source_frequency = defaultdict(int) |
| |
| def check_semantic_laundering(self, |
| content_hash: str, |
| raw_content: str, |
| source_id: str, |
| workflow_id: str) -> Dict: |
| """Check for semantic laundering patterns""" |
| |
| # Check source frequency |
| self.source_frequency[source_id] += 1 |
| if self.source_frequency[source_id] > 100: # Threshold |
| return { |
| "risk": "high", |
| "reason": "Excessive submissions from single source", |
| "action": "throttle" |
| } |
| |
| # Check similarity clusters |
| content_vector = self._vectorize(raw_content) |
| similar = self._find_similar(content_vector) |
| |
| if similar: |
| cluster_id = similar[0][ |
| self.similarity_clusters[cluster_id].append({ |
| "content_hash": content_hash, |
| "timestamp": datetime.utcnow().isoformat(), |
| "workflow_id": workflow_id |
| }) |
| |
| # Check cluster growth rate |
| if len(self.similarity_clusters[cluster_id]) > 10: |
| return { |
| "risk": "medium", |
| "reason": "Rapid growth of similar content cluster", |
| "action": "flag_for_review" |
| } |
| |
| return {"risk": "low", "reason": "No laundering patterns detected"} |
| |
| def _vectorize(self, content: str) -> List[float]: |
| """Create semantic vector (simplified - use real embeddings in production)""" |
| # Simple bag-of-words for demonstration |
| words = content.lower().split() |
| word_counts = Counter(words) |
| vector = [word_counts.get(w, 0) for w in self.vocabulary] |
| return vector |
| |
| def _find_similar(self, vector: List[float], threshold: float = 0.8): |
| """Find similar vectors in existing clusters""" |
| # Simplified similarity search |
| for cluster_id, items in self.similarity_clusters.items(): |
| # In production, use proper vector similarity |
| if random.random() > 0.5: # Placeholder |
| return items |
| return None |
| ``` |
|
|
| IMPLEMENTATION ROADMAP |
|
|
| Phase 1: Core Sovereignty (Weeks 1-2) |
|
|
| 1. Implement validator PKI and attestation framework |
| 2. Deploy threshold abstraction layer |
| 3. Add time-window enforcement |
| 4. Basic semantic laundering detection |
|
|
| Phase 2: Epistemic Defense (Weeks 3-4) |
|
|
| 1. Full STRIDE-E threat model implementation |
| 2. Validator quorum calculus integration |
| 3. Ledger schema hardening |
| 4. Cross-validation with frozen models |
|
|
| Phase 3: Operational Resilience (Weeks 5-6) |
|
|
| 1. Drift detection and alerting |
| 2. Validator role rotation policies |
| 3. Recovery procedures for compromise |
| 4. Full audit trail integration |
|
|
| Phase 4: Community Governance (Weeks 7-8) |
|
|
| 1. Validator reputation system |
| 2. Plurality-based decision frameworks |
| 3. Cross-interpreter reconciliation |
| 4. Sovereign identity integration |
|
|
| VERIFICATION PROTOCOL ENHANCEMENT |
|
|
| Daily Sovereignty Check |
|
|
| ```python |
| def daily_sovereignty_audit(): |
| """Daily audit to ensure no boundary violations""" |
| |
| checks = [ |
| # 1. Check n8n for epistemic logic |
| scan_n8n_workflows_for_detection_logic(), |
| |
| # 2. Check IRE for orchestration logic |
| scan_ire_for_scheduling_logic(), |
| |
| # 3. Verify threshold abstraction |
| verify_no_numeric_thresholds_in_n8n(), |
| |
| # 4. Validate time-window adherence |
| verify_all_executions_within_windows(), |
| |
| # 5. Check validator quorum compliance |
| verify_all_commits_have_proper_quorum(), |
| |
| # 6. Detect semantic laundering |
| run_semantic_laundering_detection(), |
| |
| # 7. Verify workflow definition integrity |
| hash_and_verify_workflow_definitions(), |
| |
| # 8. Check for drift |
| compare_with_frozen_model_baseline() |
| ] |
| |
| sovereignty_score = sum(1 for check in checks if check.passed) / len(checks) |
| |
| return { |
| "sovereignty_score": sovereignty_score, |
| "failed_checks": [c for c in checks if not c.passed], |
| "recommendations": generate_remediation_plan(failed_checks) |
| } |
| ``` |
|
|
| Weekly Epistemic Integrity Report |
|
|
| ```python |
| def weekly_epistemic_integrity_report(): |
| """Weekly comprehensive epistemic health report""" |
| |
| report = { |
| "temporal_integrity": { |
| "execution_windows_violated": count_window_violations(), |
| "time_anomalies_detected": detect_time_anomalies(), |
| "average_execution_latency": calculate_latency() |
| }, |
| "validator_health": { |
| "active_validators": count_active_validators(), |
| "quorum_satisfaction_rate": calculate_quorum_rate(), |
| "validator_role_diversity": measure_diversity() |
| }, |
| "detection_quality": { |
| "drift_from_baseline": measure_drift(), |
| "false_positive_rate": calculate_fpr(), |
| "escalation_accuracy": measure_escalation_accuracy() |
| }, |
| "boundary_integrity": { |
| "n8n_epistemic_contamination": detect_contamination(), |
| "ire_orchestration_leakage": detect_leakage(), |
| "workflow_definition_changes": track_workflow_changes() |
| }, |
| "semantic_defenses": { |
| "laundering_attempts": count_laundering_attempts(), |
| "similarity_clusters": analyze_clusters(), |
| "source_credibility": assess_source_credibility() |
| } |
| } |
| |
| # Calculate overall epistemic health score |
| report["epistemic_health_score"] = calculate_health_score(report) |
| |
| return report |
| ``` |