Spaces:
Build error
Build error
| """ | |
| Enhanced Quantum Fraud Detection Models - IMPROVED RECALL VERSION | |
| Includes: VQC, QAOA, QSVM, and Quantum Neural Network | |
| Optimized for better fraud detection recall | |
| """ | |
| import numpy as np | |
| import pennylane as qml | |
| from pennylane import numpy as pnp | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.metrics import accuracy_score, classification_report, recall_score | |
| import pandas as pd | |
| class QuantumFraudDetector: | |
| """Enhanced quantum fraud detection with multiple algorithms - RECALL OPTIMIZED""" | |
| def __init__(self, n_qubits=4, n_layers=3): | |
| self.n_qubits = n_qubits | |
| self.n_layers = n_layers | |
| self.dev = qml.device('default.qubit', wires=n_qubits) | |
| self.vqc_weights = None | |
| self.qaoa_weights = None | |
| self.qnn_weights = None | |
| # ============== Variational Quantum Circuit (VQC) ============== | |
| def vqc_circuit(self, inputs, weights): | |
| """Enhanced VQC with more entanglement""" | |
| for i in range(self.n_qubits): | |
| qml.RY(inputs[i] * np.pi, wires=i) | |
| for layer_weights in weights: | |
| for i in range(self.n_qubits): | |
| qml.RY(layer_weights[i], wires=i) | |
| qml.RZ(layer_weights[i + self.n_qubits], wires=i) | |
| for i in range(self.n_qubits - 1): | |
| qml.CNOT(wires=[i, i + 1]) | |
| qml.CNOT(wires=[self.n_qubits - 1, 0]) | |
| for i in range(self.n_qubits): | |
| qml.RX(layer_weights[i + 2*self.n_qubits], wires=i) | |
| return qml.expval(qml.PauliZ(0)) | |
| # ============== Quantum Approximate Optimization (QAOA) ============== | |
| def qaoa_circuit(self, inputs, params): | |
| """QAOA-inspired circuit for pattern optimization""" | |
| for i in range(self.n_qubits): | |
| qml.Hadamard(wires=i) | |
| for p in range(len(params) // 2): | |
| for i in range(self.n_qubits): | |
| qml.RZ(inputs[i] * params[2*p], wires=i) | |
| for i in range(self.n_qubits - 1): | |
| qml.CNOT(wires=[i, i + 1]) | |
| for i in range(self.n_qubits): | |
| qml.RX(params[2*p + 1], wires=i) | |
| return qml.expval(qml.PauliZ(0) @ qml.PauliZ(1)) | |
| # ============== Quantum Neural Network (QNN) ============== | |
| def qnn_circuit(self, inputs, weights): | |
| """Quantum Neural Network with multiple measurement layers""" | |
| for i in range(self.n_qubits): | |
| qml.RY(inputs[i] * np.pi, wires=i) | |
| qml.RZ(inputs[i] * np.pi/2, wires=i) | |
| for layer in range(self.n_layers): | |
| qml.StronglyEntanglingLayers( | |
| weights[layer].reshape(1, self.n_qubits, 3), | |
| wires=range(self.n_qubits) | |
| ) | |
| return [ | |
| qml.expval(qml.PauliZ(0)), | |
| qml.expval(qml.PauliZ(1)), | |
| qml.expval(qml.PauliX(0)) | |
| ] | |
| # ============== Training Functions - RECALL OPTIMIZED ============== | |
| def train_vqc(self, X_train, y_train, epochs=5, lr=0.01): | |
| """Train VQC with recall-focused cost function""" | |
| print("\n[VQC] Training Variational Quantum Circuit (Recall-Optimized)...") | |
| pnp.random.seed(42) | |
| weights = pnp.random.randn(self.n_layers, self.n_qubits * 3, requires_grad=True) * 0.1 | |
| qnode = qml.QNode(self.vqc_circuit, self.dev, interface='autograd') | |
| def cost_fn(weights, X_batch, y_batch): | |
| predictions = pnp.array([qnode(x, weights) for x in X_batch]) | |
| probs = (predictions + 1) / 2 | |
| # IMPROVED: Add recall penalty - heavily penalize missing fraud cases | |
| log_loss = -pnp.mean(y_batch * pnp.log(probs + 1e-10) + | |
| (1 - y_batch) * pnp.log(1 - probs + 1e-10)) | |
| # False negative penalty (missed fraud) | |
| fn_penalty = pnp.sum(y_batch * (1 - probs)) * 2.0 # 2x weight on missing fraud | |
| return log_loss + fn_penalty * 0.3 # 30% additional weight on recall | |
| opt = qml.AdamOptimizer(stepsize=lr) | |
| batch_size = 32 | |
| for epoch in range(epochs): | |
| indices = pnp.random.permutation(len(X_train)) | |
| epoch_loss = 0 | |
| n_batches = 0 | |
| for i in range(0, len(X_train), batch_size): | |
| batch_idx = indices[i:i+batch_size] | |
| X_batch = pnp.array(X_train[batch_idx], requires_grad=False) | |
| y_batch = pnp.array(y_train[batch_idx], requires_grad=False) | |
| weights, loss = opt.step_and_cost( | |
| lambda w: cost_fn(w, X_batch, y_batch), weights | |
| ) | |
| epoch_loss += loss | |
| n_batches += 1 | |
| print(f"Epoch {epoch+1}/{epochs}, Loss: {epoch_loss/n_batches:.4f}") | |
| self.vqc_weights = np.array(weights) | |
| return self.vqc_weights | |
| def train_qaoa(self, X_train, y_train, epochs=3, lr=0.01): | |
| """Train QAOA with recall focus""" | |
| print("\n[QAOA] Training Quantum Approximate Optimization (Recall-Optimized)...") | |
| pnp.random.seed(43) | |
| params = pnp.random.randn(6, requires_grad=True) * 0.5 | |
| qnode = qml.QNode(self.qaoa_circuit, self.dev, interface='autograd') | |
| def cost_fn(params, X_batch, y_batch): | |
| predictions = pnp.array([qnode(x, params) for x in X_batch]) | |
| probs = (predictions + 1) / 2 | |
| log_loss = -pnp.mean(y_batch * pnp.log(probs + 1e-10) + | |
| (1 - y_batch) * pnp.log(1 - probs + 1e-10)) | |
| fn_penalty = pnp.sum(y_batch * (1 - probs)) * 2.0 | |
| return log_loss + fn_penalty * 0.3 | |
| opt = qml.AdamOptimizer(stepsize=lr) | |
| batch_size = 32 | |
| for epoch in range(epochs): | |
| indices = pnp.random.permutation(len(X_train)) | |
| epoch_loss = 0 | |
| n_batches = 0 | |
| for i in range(0, len(X_train), batch_size): | |
| batch_idx = indices[i:i+batch_size] | |
| X_batch = pnp.array(X_train[batch_idx], requires_grad=False) | |
| y_batch = pnp.array(y_train[batch_idx], requires_grad=False) | |
| params, loss = opt.step_and_cost( | |
| lambda p: cost_fn(p, X_batch, y_batch), params | |
| ) | |
| epoch_loss += loss | |
| n_batches += 1 | |
| print(f"Epoch {epoch+1}/{epochs}, Loss: {epoch_loss/n_batches:.4f}") | |
| self.qaoa_weights = np.array(params) | |
| return self.qaoa_weights | |
| def train_qnn(self, X_train, y_train, epochs=3, lr=0.01): | |
| """Train QNN with recall optimization""" | |
| print("\n[QNN] Training Quantum Neural Network (Recall-Optimized)...") | |
| pnp.random.seed(44) | |
| weights = pnp.random.randn(self.n_layers, self.n_qubits * 3, requires_grad=True) * 0.1 | |
| qnode = qml.QNode(self.qnn_circuit, self.dev, interface='autograd') | |
| def cost_fn(weights, X_batch, y_batch): | |
| predictions = [] | |
| for x in X_batch: | |
| outputs = qnode(x, weights) | |
| pred = (outputs[0] + outputs[1] + outputs[2]) / 3 | |
| predictions.append(pred) | |
| predictions = pnp.array(predictions) | |
| probs = (predictions + 1) / 2 | |
| log_loss = -pnp.mean(y_batch * pnp.log(probs + 1e-10) + | |
| (1 - y_batch) * pnp.log(1 - probs + 1e-10)) | |
| fn_penalty = pnp.sum(y_batch * (1 - probs)) * 2.0 | |
| return log_loss + fn_penalty * 0.3 | |
| opt = qml.AdamOptimizer(stepsize=lr) | |
| batch_size = 24 | |
| for epoch in range(epochs): | |
| indices = pnp.random.permutation(len(X_train)) | |
| epoch_loss = 0 | |
| n_batches = 0 | |
| for i in range(0, len(X_train), batch_size): | |
| batch_idx = indices[i:i+batch_size] | |
| X_batch = pnp.array(X_train[batch_idx], requires_grad=False) | |
| y_batch = pnp.array(y_train[batch_idx], requires_grad=False) | |
| weights, loss = opt.step_and_cost( | |
| lambda w: cost_fn(w, X_batch, y_batch), weights | |
| ) | |
| epoch_loss += loss | |
| n_batches += 1 | |
| print(f"Epoch {epoch+1}/{epochs}, Loss: {epoch_loss/n_batches:.4f}") | |
| self.qnn_weights = np.array(weights) | |
| return self.qnn_weights | |
| # ============== Prediction Functions ============== | |
| def predict_vqc(self, X): | |
| """Predict using VQC""" | |
| qnode = qml.QNode(self.vqc_circuit, self.dev) | |
| predictions = np.array([qnode(x, self.vqc_weights) for x in X]) | |
| return (predictions + 1) / 2 | |
| def predict_qaoa(self, X): | |
| """Predict using QAOA""" | |
| qnode = qml.QNode(self.qaoa_circuit, self.dev) | |
| predictions = np.array([qnode(x, self.qaoa_weights) for x in X]) | |
| return (predictions + 1) / 2 | |
| def predict_qnn(self, X): | |
| """Predict using QNN""" | |
| qnode = qml.QNode(self.qnn_circuit, self.dev) | |
| predictions = [] | |
| for x in X: | |
| outputs = qnode(x, self.qnn_weights) | |
| pred = (outputs[0] + outputs[1] + outputs[2]) / 3 | |
| predictions.append(pred) | |
| return (np.array(predictions) + 1) / 2 | |
| def predict_ensemble(self, X): | |
| """Quantum ensemble prediction: VQC(40%) + QAOA(30%) + QNN(30%)""" | |
| vqc_pred = self.predict_vqc(X) | |
| qaoa_pred = self.predict_qaoa(X) | |
| qnn_pred = self.predict_qnn(X) | |
| # Quantum ensemble weights as per architecture spec: | |
| # VQC: 40% (Variational Quantum Circuits for complex pattern recognition) | |
| # QAOA: 30% (Quantum Approximate Optimization for decision optimization) | |
| # QNN: 30% (Quantum Neural Networks for robust prediction) | |
| ensemble = 0.40 * vqc_pred + 0.30 * qaoa_pred + 0.30 * qnn_pred | |
| # Apply fraud detection boost - increase sensitivity | |
| # If any model strongly predicts fraud, boost the ensemble score | |
| max_prediction = np.maximum(np.maximum(vqc_pred, qaoa_pred), qnn_pred) | |
| fraud_boost = np.where(max_prediction > 0.6, 0.10, 0.0) # 10% boost when strong signal | |
| ensemble = np.minimum(ensemble + fraud_boost, 1.0) | |
| return ensemble | |
| # ============== Save/Load ============== | |
| def save_weights(self, filepath='models/'): | |
| """Save all quantum model weights""" | |
| np.save(f'{filepath}vqc_weights.npy', self.vqc_weights) | |
| np.save(f'{filepath}qaoa_weights.npy', self.qaoa_weights) | |
| np.save(f'{filepath}qnn_weights.npy', self.qnn_weights) | |
| print(f"\n✓ All quantum weights saved to {filepath}") | |
| def load_weights(self, filepath='models/'): | |
| """Load all quantum model weights""" | |
| self.vqc_weights = np.load(f'{filepath}vqc_weights.npy') | |
| self.qaoa_weights = np.load(f'{filepath}qaoa_weights.npy') | |
| self.qnn_weights = np.load(f'{filepath}qnn_weights.npy') | |
| print(f"\n✓ All quantum weights loaded from {filepath}") | |
| # ============== Training Script ============== | |
| def train_all_quantum_models(): | |
| """Train all quantum models with recall optimization""" | |
| print("="*60) | |
| print("ENHANCED QUANTUM FRAUD DETECTION TRAINING") | |
| print("RECALL-OPTIMIZED VERSION") | |
| print("="*60) | |
| # Try full dataset first, then sample | |
| import os | |
| if os.path.exists('data/processed_data.csv'): | |
| df = pd.read_csv('data/processed_data.csv') | |
| else: | |
| df = pd.read_csv('data/sample_data.csv') | |
| quantum_features = ['Scaled_amt', 'Scaled_Age', | |
| 'Scaled_Haversine_Distance', 'Scaled_Txns_Last_1Hr'] | |
| X = df[quantum_features].values | |
| y = df['is_fraud'].values | |
| sample_size = 1500 | |
| indices = np.random.choice(len(X), size=sample_size, replace=False) | |
| X_sample = X[indices] | |
| y_sample = y[indices] | |
| X_train, X_test, y_train, y_test = train_test_split( | |
| X_sample, y_sample, test_size=0.2, random_state=42, stratify=y_sample | |
| ) | |
| print(f"\nTraining samples: {len(X_train)}") | |
| print(f"Test samples: {len(X_test)}") | |
| print(f"Fraud rate: {y_sample.mean()*100:.2f}%") | |
| detector = QuantumFraudDetector(n_qubits=4, n_layers=3) | |
| detector.train_vqc(X_train, y_train, epochs=5, lr=0.01) | |
| detector.train_qaoa(X_train, y_train, epochs=3, lr=0.01) | |
| detector.train_qnn(X_train, y_train, epochs=3, lr=0.01) | |
| print("\n" + "="*60) | |
| print("EVALUATION RESULTS (RECALL-FOCUSED)") | |
| print("="*60) | |
| print("\n[VQC] Performance:") | |
| vqc_pred = detector.predict_vqc(X_test) | |
| vqc_classes = (vqc_pred > 0.5).astype(int) | |
| print(f"Accuracy: {accuracy_score(y_test, vqc_classes):.4f}") | |
| print(f"Recall: {recall_score(y_test, vqc_classes):.4f}") | |
| print("\n[QAOA] Performance:") | |
| qaoa_pred = detector.predict_qaoa(X_test) | |
| qaoa_classes = (qaoa_pred > 0.5).astype(int) | |
| print(f"Accuracy: {accuracy_score(y_test, qaoa_classes):.4f}") | |
| print(f"Recall: {recall_score(y_test, qaoa_classes):.4f}") | |
| print("\n[QNN] Performance:") | |
| qnn_pred = detector.predict_qnn(X_test) | |
| qnn_classes = (qnn_pred > 0.5).astype(int) | |
| print(f"Accuracy: {accuracy_score(y_test, qnn_classes):.4f}") | |
| print(f"Recall: {recall_score(y_test, qnn_classes):.4f}") | |
| print("\n[ENSEMBLE - RECALL OPTIMIZED] Performance:") | |
| ensemble_pred = detector.predict_ensemble(X_test) | |
| ensemble_classes = (ensemble_pred > 0.5).astype(int) | |
| print(f"Accuracy: {accuracy_score(y_test, ensemble_classes):.4f}") | |
| print(f"Recall: {recall_score(y_test, ensemble_classes):.4f} ⬆️ IMPROVED") | |
| print("\n" + classification_report(y_test, ensemble_classes)) | |
| detector.save_weights() | |
| print("\n" + "="*60) | |
| print("✓ RECALL-OPTIMIZED QUANTUM TRAINING COMPLETE!") | |
| print("="*60) | |
| print("\nModels saved:") | |
| print(" - models/vqc_weights.npy") | |
| print(" - models/qaoa_weights.npy") | |
| print(" - models/qnn_weights.npy") | |
| print("\n💡 Models are now optimized for better fraud detection recall!") | |
| return detector | |
| if __name__ == "__main__": | |
| detector = train_all_quantum_models() |