| """ | |
| Heavily based on: https://github.com/facebookresearch/faiss/blob/master/benchs/bench_gpu_1bn.py | |
| """ | |
| import sys | |
| import time | |
| import math | |
| import faiss | |
| import torch | |
| import numpy as np | |
| from colbert.utils.utils import print_message | |
| class FaissIndexGPU(): | |
| def __init__(self): | |
| self.ngpu = faiss.get_num_gpus() | |
| if self.ngpu == 0: | |
| return | |
| self.tempmem = 1 << 33 | |
| self.max_add_per_gpu = 1 << 25 | |
| self.max_add = self.max_add_per_gpu * self.ngpu | |
| self.add_batch_size = 65536 | |
| self.gpu_resources = self._prepare_gpu_resources() | |
| def _prepare_gpu_resources(self): | |
| print_message(f"Preparing resources for {self.ngpu} GPUs.") | |
| gpu_resources = [] | |
| for _ in range(self.ngpu): | |
| res = faiss.StandardGpuResources() | |
| if self.tempmem >= 0: | |
| res.setTempMemory(self.tempmem) | |
| gpu_resources.append(res) | |
| return gpu_resources | |
| def _make_vres_vdev(self): | |
| """ | |
| return vectors of device ids and resources useful for gpu_multiple | |
| """ | |
| assert self.ngpu > 0 | |
| vres = faiss.GpuResourcesVector() | |
| vdev = faiss.IntVector() | |
| for i in range(self.ngpu): | |
| vdev.push_back(i) | |
| vres.push_back(self.gpu_resources[i]) | |
| return vres, vdev | |
| def training_initialize(self, index, quantizer): | |
| """ | |
| The index and quantizer should be owned by caller. | |
| """ | |
| assert self.ngpu > 0 | |
| s = time.time() | |
| self.index_ivf = faiss.extract_index_ivf(index) | |
| self.clustering_index = faiss.index_cpu_to_all_gpus(quantizer) | |
| self.index_ivf.clustering_index = self.clustering_index | |
| print(time.time() - s) | |
| def training_finalize(self): | |
| assert self.ngpu > 0 | |
| s = time.time() | |
| self.index_ivf.clustering_index = faiss.index_gpu_to_cpu(self.index_ivf.clustering_index) | |
| print(time.time() - s) | |
| def adding_initialize(self, index): | |
| """ | |
| The index should be owned by caller. | |
| """ | |
| assert self.ngpu > 0 | |
| self.co = faiss.GpuMultipleClonerOptions() | |
| self.co.useFloat16 = True | |
| self.co.useFloat16CoarseQuantizer = False | |
| self.co.usePrecomputed = False | |
| self.co.indicesOptions = faiss.INDICES_CPU | |
| self.co.verbose = True | |
| self.co.reserveVecs = self.max_add | |
| self.co.shard = True | |
| assert self.co.shard_type in (0, 1, 2) | |
| self.vres, self.vdev = self._make_vres_vdev() | |
| self.gpu_index = faiss.index_cpu_to_gpu_multiple(self.vres, self.vdev, index, self.co) | |
| def add(self, index, data, offset): | |
| assert self.ngpu > 0 | |
| t0 = time.time() | |
| nb = data.shape[0] | |
| for i0 in range(0, nb, self.add_batch_size): | |
| i1 = min(i0 + self.add_batch_size, nb) | |
| xs = data[i0:i1] | |
| self.gpu_index.add_with_ids(xs, np.arange(offset+i0, offset+i1)) | |
| if self.max_add > 0 and self.gpu_index.ntotal > self.max_add: | |
| self._flush_to_cpu(index, nb, offset) | |
| print('\r%d/%d (%.3f s) ' % (i0, nb, time.time() - t0), end=' ') | |
| sys.stdout.flush() | |
| if self.gpu_index.ntotal > 0: | |
| self._flush_to_cpu(index, nb, offset) | |
| assert index.ntotal == offset+nb, (index.ntotal, offset+nb, offset, nb) | |
| print(f"add(.) time: %.3f s \t\t--\t\t index.ntotal = {index.ntotal}" % (time.time() - t0)) | |
| def _flush_to_cpu(self, index, nb, offset): | |
| print("Flush indexes to CPU") | |
| for i in range(self.ngpu): | |
| index_src_gpu = faiss.downcast_index(self.gpu_index if self.ngpu == 1 else self.gpu_index.at(i)) | |
| index_src = faiss.index_gpu_to_cpu(index_src_gpu) | |
| index_src.copy_subset_to(index, 0, offset, offset+nb) | |
| index_src_gpu.reset() | |
| index_src_gpu.reserveMemory(self.max_add) | |
| if self.ngpu > 1: | |
| try: | |
| self.gpu_index.sync_with_shard_indexes() | |
| except: | |
| self.gpu_index.syncWithSubIndexes() | |