| import os |
| import torch |
| import warnings |
| warnings.filterwarnings('ignore') |
| import requests |
| from io import BytesIO |
| from transformers.pipelines.audio_utils import ffmpeg_read |
| import mutagen |
| from torchaudio import functional as taF |
| import numpy as np |
|
|
| feature_extractor_sampling_rate = 16000 |
| clip_length = 30*feature_extractor_sampling_rate |
| clip_drop = feature_extractor_sampling_rate//2 |
| AUDIO_EXTENSIONS = ('.wav', '.mp3', '.flac', '.opus', '.ogg') |
|
|
|
|
| def load_audio_single(audio_file, seg=None): |
| assert isinstance(audio_file, str), "audio_file should be a string" |
| if audio_file.endswith(AUDIO_EXTENSIONS): |
| inputs=audio_file |
| in_sampling_rate=mutagen.File(inputs).info.sample_rate |
| if inputs.startswith("http://") or inputs.startswith("https://"): |
| |
| inputs = requests.get(inputs).content |
| else: |
| with open(inputs, "rb") as f: |
| inputs = f.read() |
| if isinstance(inputs, bytes): |
| inputs = ffmpeg_read(inputs, in_sampling_rate) |
| if seg is not None: |
| inputs = inputs[int(seg[0] * in_sampling_rate):int(seg[1] * in_sampling_rate)] |
| if in_sampling_rate != feature_extractor_sampling_rate: |
| inputs = taF.resample( |
| torch.from_numpy(inputs.copy()), in_sampling_rate, feature_extractor_sampling_rate |
| ).numpy() |
| if len(inputs) <= clip_length: |
| return [inputs] |
| else: |
| audios = [] |
| for i in range(0, len(inputs), clip_length): |
| chunk = inputs[i : i + clip_length] |
| chunk_index = len(chunk) |
| if chunk_index > clip_drop: |
| audios.append(chunk) |
| return audios |
| if audio_file.endswith('.npy'): |
| return [np.load(audio_file)] |
| |
|
|
| def load_audios(audio_preprocess, audio_files, segs=None, audio_folder=None): |
| if audio_files is None: |
| return None, None |
| if isinstance(audio_files, str): |
| audio_files = [audio_files] |
| if segs: |
| if segs and isinstance(segs[0], float): |
| segs = [segs] |
| else: |
| segs = [None for _ in range(len(audio_files))] |
| if audio_folder: |
| audio_files = [os.path.join(audio_folder, afile) for afile in audio_files] |
| |
| def get_single_audio(audio_file, seg): |
| try: |
| if seg: |
| audio = load_audio_single(audio_file, seg) |
| else: |
| audio = load_audio_single(audio_file) |
| |
| audio = [audio_preprocess(aud) for aud in audio] |
| |
| except Exception as e: |
| print(f"Error loading {audio_file} seg {seg}: {e}") |
| audio = None |
| |
| return audio |
| |
| audio_size= [] |
| audio_list = [] |
| for ii in range(len(audio_files)): |
| audio_file = audio_files[ii] |
| seg = segs[ii] |
| single_audio_list = get_single_audio(audio_file,seg) |
| audio_size.append(len(single_audio_list)) |
| audio_list.extend(single_audio_list) |
| |
| return audio_list, audio_size |
|
|
| class AudioPreprocess: |
| def __init__(self, image_processor, data_args={}): |
| self.image_aspect_ratio = getattr(data_args, 'image_aspect_ratio', None) |
| self.image_processor = image_processor |
| |
| |
| def __call__(self, image): |
| assert self.image_aspect_ratio == "audio", "image_aspect_ratio should be 'audio' for audio preprocessing" |
| return self.image_processor(image, sampling_rate=feature_extractor_sampling_rate, return_tensors="pt").input_features |
| |