QuantumShield / backend /enhanced_quantum_models.py
SantoshKumar1310's picture
Upload folder using huggingface_hub
63590dc verified
"""
Enhanced Quantum Fraud Detection Models - IMPROVED RECALL VERSION
Includes: VQC, QAOA, QSVM, and Quantum Neural Network
Optimized for better fraud detection recall
"""
import numpy as np
import pennylane as qml
from pennylane import numpy as pnp
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, recall_score
import pandas as pd
class QuantumFraudDetector:
"""Enhanced quantum fraud detection with multiple algorithms - RECALL OPTIMIZED"""
def __init__(self, n_qubits=4, n_layers=3):
self.n_qubits = n_qubits
self.n_layers = n_layers
self.dev = qml.device('default.qubit', wires=n_qubits)
self.vqc_weights = None
self.qaoa_weights = None
self.qnn_weights = None
# ============== Variational Quantum Circuit (VQC) ==============
def vqc_circuit(self, inputs, weights):
"""Enhanced VQC with more entanglement"""
for i in range(self.n_qubits):
qml.RY(inputs[i] * np.pi, wires=i)
for layer_weights in weights:
for i in range(self.n_qubits):
qml.RY(layer_weights[i], wires=i)
qml.RZ(layer_weights[i + self.n_qubits], wires=i)
for i in range(self.n_qubits - 1):
qml.CNOT(wires=[i, i + 1])
qml.CNOT(wires=[self.n_qubits - 1, 0])
for i in range(self.n_qubits):
qml.RX(layer_weights[i + 2*self.n_qubits], wires=i)
return qml.expval(qml.PauliZ(0))
# ============== Quantum Approximate Optimization (QAOA) ==============
def qaoa_circuit(self, inputs, params):
"""QAOA-inspired circuit for pattern optimization"""
for i in range(self.n_qubits):
qml.Hadamard(wires=i)
for p in range(len(params) // 2):
for i in range(self.n_qubits):
qml.RZ(inputs[i] * params[2*p], wires=i)
for i in range(self.n_qubits - 1):
qml.CNOT(wires=[i, i + 1])
for i in range(self.n_qubits):
qml.RX(params[2*p + 1], wires=i)
return qml.expval(qml.PauliZ(0) @ qml.PauliZ(1))
# ============== Quantum Neural Network (QNN) ==============
def qnn_circuit(self, inputs, weights):
"""Quantum Neural Network with multiple measurement layers"""
for i in range(self.n_qubits):
qml.RY(inputs[i] * np.pi, wires=i)
qml.RZ(inputs[i] * np.pi/2, wires=i)
for layer in range(self.n_layers):
qml.StronglyEntanglingLayers(
weights[layer].reshape(1, self.n_qubits, 3),
wires=range(self.n_qubits)
)
return [
qml.expval(qml.PauliZ(0)),
qml.expval(qml.PauliZ(1)),
qml.expval(qml.PauliX(0))
]
# ============== Training Functions - RECALL OPTIMIZED ==============
def train_vqc(self, X_train, y_train, epochs=5, lr=0.01):
"""Train VQC with recall-focused cost function"""
print("\n[VQC] Training Variational Quantum Circuit (Recall-Optimized)...")
pnp.random.seed(42)
weights = pnp.random.randn(self.n_layers, self.n_qubits * 3, requires_grad=True) * 0.1
qnode = qml.QNode(self.vqc_circuit, self.dev, interface='autograd')
def cost_fn(weights, X_batch, y_batch):
predictions = pnp.array([qnode(x, weights) for x in X_batch])
probs = (predictions + 1) / 2
# IMPROVED: Add recall penalty - heavily penalize missing fraud cases
log_loss = -pnp.mean(y_batch * pnp.log(probs + 1e-10) +
(1 - y_batch) * pnp.log(1 - probs + 1e-10))
# False negative penalty (missed fraud)
fn_penalty = pnp.sum(y_batch * (1 - probs)) * 2.0 # 2x weight on missing fraud
return log_loss + fn_penalty * 0.3 # 30% additional weight on recall
opt = qml.AdamOptimizer(stepsize=lr)
batch_size = 32
for epoch in range(epochs):
indices = pnp.random.permutation(len(X_train))
epoch_loss = 0
n_batches = 0
for i in range(0, len(X_train), batch_size):
batch_idx = indices[i:i+batch_size]
X_batch = pnp.array(X_train[batch_idx], requires_grad=False)
y_batch = pnp.array(y_train[batch_idx], requires_grad=False)
weights, loss = opt.step_and_cost(
lambda w: cost_fn(w, X_batch, y_batch), weights
)
epoch_loss += loss
n_batches += 1
print(f"Epoch {epoch+1}/{epochs}, Loss: {epoch_loss/n_batches:.4f}")
self.vqc_weights = np.array(weights)
return self.vqc_weights
def train_qaoa(self, X_train, y_train, epochs=3, lr=0.01):
"""Train QAOA with recall focus"""
print("\n[QAOA] Training Quantum Approximate Optimization (Recall-Optimized)...")
pnp.random.seed(43)
params = pnp.random.randn(6, requires_grad=True) * 0.5
qnode = qml.QNode(self.qaoa_circuit, self.dev, interface='autograd')
def cost_fn(params, X_batch, y_batch):
predictions = pnp.array([qnode(x, params) for x in X_batch])
probs = (predictions + 1) / 2
log_loss = -pnp.mean(y_batch * pnp.log(probs + 1e-10) +
(1 - y_batch) * pnp.log(1 - probs + 1e-10))
fn_penalty = pnp.sum(y_batch * (1 - probs)) * 2.0
return log_loss + fn_penalty * 0.3
opt = qml.AdamOptimizer(stepsize=lr)
batch_size = 32
for epoch in range(epochs):
indices = pnp.random.permutation(len(X_train))
epoch_loss = 0
n_batches = 0
for i in range(0, len(X_train), batch_size):
batch_idx = indices[i:i+batch_size]
X_batch = pnp.array(X_train[batch_idx], requires_grad=False)
y_batch = pnp.array(y_train[batch_idx], requires_grad=False)
params, loss = opt.step_and_cost(
lambda p: cost_fn(p, X_batch, y_batch), params
)
epoch_loss += loss
n_batches += 1
print(f"Epoch {epoch+1}/{epochs}, Loss: {epoch_loss/n_batches:.4f}")
self.qaoa_weights = np.array(params)
return self.qaoa_weights
def train_qnn(self, X_train, y_train, epochs=3, lr=0.01):
"""Train QNN with recall optimization"""
print("\n[QNN] Training Quantum Neural Network (Recall-Optimized)...")
pnp.random.seed(44)
weights = pnp.random.randn(self.n_layers, self.n_qubits * 3, requires_grad=True) * 0.1
qnode = qml.QNode(self.qnn_circuit, self.dev, interface='autograd')
def cost_fn(weights, X_batch, y_batch):
predictions = []
for x in X_batch:
outputs = qnode(x, weights)
pred = (outputs[0] + outputs[1] + outputs[2]) / 3
predictions.append(pred)
predictions = pnp.array(predictions)
probs = (predictions + 1) / 2
log_loss = -pnp.mean(y_batch * pnp.log(probs + 1e-10) +
(1 - y_batch) * pnp.log(1 - probs + 1e-10))
fn_penalty = pnp.sum(y_batch * (1 - probs)) * 2.0
return log_loss + fn_penalty * 0.3
opt = qml.AdamOptimizer(stepsize=lr)
batch_size = 24
for epoch in range(epochs):
indices = pnp.random.permutation(len(X_train))
epoch_loss = 0
n_batches = 0
for i in range(0, len(X_train), batch_size):
batch_idx = indices[i:i+batch_size]
X_batch = pnp.array(X_train[batch_idx], requires_grad=False)
y_batch = pnp.array(y_train[batch_idx], requires_grad=False)
weights, loss = opt.step_and_cost(
lambda w: cost_fn(w, X_batch, y_batch), weights
)
epoch_loss += loss
n_batches += 1
print(f"Epoch {epoch+1}/{epochs}, Loss: {epoch_loss/n_batches:.4f}")
self.qnn_weights = np.array(weights)
return self.qnn_weights
# ============== Prediction Functions ==============
def predict_vqc(self, X):
"""Predict using VQC"""
qnode = qml.QNode(self.vqc_circuit, self.dev)
predictions = np.array([qnode(x, self.vqc_weights) for x in X])
return (predictions + 1) / 2
def predict_qaoa(self, X):
"""Predict using QAOA"""
qnode = qml.QNode(self.qaoa_circuit, self.dev)
predictions = np.array([qnode(x, self.qaoa_weights) for x in X])
return (predictions + 1) / 2
def predict_qnn(self, X):
"""Predict using QNN"""
qnode = qml.QNode(self.qnn_circuit, self.dev)
predictions = []
for x in X:
outputs = qnode(x, self.qnn_weights)
pred = (outputs[0] + outputs[1] + outputs[2]) / 3
predictions.append(pred)
return (np.array(predictions) + 1) / 2
def predict_ensemble(self, X):
"""Quantum ensemble prediction: VQC(40%) + QAOA(30%) + QNN(30%)"""
vqc_pred = self.predict_vqc(X)
qaoa_pred = self.predict_qaoa(X)
qnn_pred = self.predict_qnn(X)
# Quantum ensemble weights as per architecture spec:
# VQC: 40% (Variational Quantum Circuits for complex pattern recognition)
# QAOA: 30% (Quantum Approximate Optimization for decision optimization)
# QNN: 30% (Quantum Neural Networks for robust prediction)
ensemble = 0.40 * vqc_pred + 0.30 * qaoa_pred + 0.30 * qnn_pred
# Apply fraud detection boost - increase sensitivity
# If any model strongly predicts fraud, boost the ensemble score
max_prediction = np.maximum(np.maximum(vqc_pred, qaoa_pred), qnn_pred)
fraud_boost = np.where(max_prediction > 0.6, 0.10, 0.0) # 10% boost when strong signal
ensemble = np.minimum(ensemble + fraud_boost, 1.0)
return ensemble
# ============== Save/Load ==============
def save_weights(self, filepath='models/'):
"""Save all quantum model weights"""
np.save(f'{filepath}vqc_weights.npy', self.vqc_weights)
np.save(f'{filepath}qaoa_weights.npy', self.qaoa_weights)
np.save(f'{filepath}qnn_weights.npy', self.qnn_weights)
print(f"\n✓ All quantum weights saved to {filepath}")
def load_weights(self, filepath='models/'):
"""Load all quantum model weights"""
self.vqc_weights = np.load(f'{filepath}vqc_weights.npy')
self.qaoa_weights = np.load(f'{filepath}qaoa_weights.npy')
self.qnn_weights = np.load(f'{filepath}qnn_weights.npy')
print(f"\n✓ All quantum weights loaded from {filepath}")
# ============== Training Script ==============
def train_all_quantum_models():
"""Train all quantum models with recall optimization"""
print("="*60)
print("ENHANCED QUANTUM FRAUD DETECTION TRAINING")
print("RECALL-OPTIMIZED VERSION")
print("="*60)
# Try full dataset first, then sample
import os
if os.path.exists('data/processed_data.csv'):
df = pd.read_csv('data/processed_data.csv')
else:
df = pd.read_csv('data/sample_data.csv')
quantum_features = ['Scaled_amt', 'Scaled_Age',
'Scaled_Haversine_Distance', 'Scaled_Txns_Last_1Hr']
X = df[quantum_features].values
y = df['is_fraud'].values
sample_size = 1500
indices = np.random.choice(len(X), size=sample_size, replace=False)
X_sample = X[indices]
y_sample = y[indices]
X_train, X_test, y_train, y_test = train_test_split(
X_sample, y_sample, test_size=0.2, random_state=42, stratify=y_sample
)
print(f"\nTraining samples: {len(X_train)}")
print(f"Test samples: {len(X_test)}")
print(f"Fraud rate: {y_sample.mean()*100:.2f}%")
detector = QuantumFraudDetector(n_qubits=4, n_layers=3)
detector.train_vqc(X_train, y_train, epochs=5, lr=0.01)
detector.train_qaoa(X_train, y_train, epochs=3, lr=0.01)
detector.train_qnn(X_train, y_train, epochs=3, lr=0.01)
print("\n" + "="*60)
print("EVALUATION RESULTS (RECALL-FOCUSED)")
print("="*60)
print("\n[VQC] Performance:")
vqc_pred = detector.predict_vqc(X_test)
vqc_classes = (vqc_pred > 0.5).astype(int)
print(f"Accuracy: {accuracy_score(y_test, vqc_classes):.4f}")
print(f"Recall: {recall_score(y_test, vqc_classes):.4f}")
print("\n[QAOA] Performance:")
qaoa_pred = detector.predict_qaoa(X_test)
qaoa_classes = (qaoa_pred > 0.5).astype(int)
print(f"Accuracy: {accuracy_score(y_test, qaoa_classes):.4f}")
print(f"Recall: {recall_score(y_test, qaoa_classes):.4f}")
print("\n[QNN] Performance:")
qnn_pred = detector.predict_qnn(X_test)
qnn_classes = (qnn_pred > 0.5).astype(int)
print(f"Accuracy: {accuracy_score(y_test, qnn_classes):.4f}")
print(f"Recall: {recall_score(y_test, qnn_classes):.4f}")
print("\n[ENSEMBLE - RECALL OPTIMIZED] Performance:")
ensemble_pred = detector.predict_ensemble(X_test)
ensemble_classes = (ensemble_pred > 0.5).astype(int)
print(f"Accuracy: {accuracy_score(y_test, ensemble_classes):.4f}")
print(f"Recall: {recall_score(y_test, ensemble_classes):.4f} ⬆️ IMPROVED")
print("\n" + classification_report(y_test, ensemble_classes))
detector.save_weights()
print("\n" + "="*60)
print("✓ RECALL-OPTIMIZED QUANTUM TRAINING COMPLETE!")
print("="*60)
print("\nModels saved:")
print(" - models/vqc_weights.npy")
print(" - models/qaoa_weights.npy")
print(" - models/qnn_weights.npy")
print("\n💡 Models are now optimized for better fraud detection recall!")
return detector
if __name__ == "__main__":
detector = train_all_quantum_models()