feat: add whisper api (#252)

* feat: add whisper api
* docs: update docs
This commit is contained in:
John Howe
2025-04-03 15:30:54 +08:00
committed by GitHub
parent 153446f93e
commit cb689b010b
11 changed files with 190 additions and 48 deletions

View File

@@ -39,7 +39,7 @@
- `GLM-4V-PLUS`
- `Gemini-2.0-flash`
- `Qwen-2.5-72B-Instruct`
- **( :tada: NEW)持久化登录/下载/上传视频(支持多p投稿)**[bilitool](https://github.com/timerring/bilitool)已经开源,实现持久化登录,下载视频及弹幕(含多p)/上传视频(可分p投稿)查询投稿状态查询详细信息等功能一键pip安装可以使用命令行 cli 操作也可以作为api调用。
- **( :tada: NEW)持久化登录/下载/上传视频(支持多p投稿)**[bilitool](https://github.com/timerring/bilitool) 已经开源,实现持久化登录,下载视频及弹幕(含多p)/上传视频(可分p投稿)查询投稿状态查询详细信息等功能一键pip安装可以使用命令行 cli 操作也可以作为api调用。
- **( :tada: NEW)自动多平台循环直播推流**:该工具已经开源 [looplive](https://github.com/timerring/looplive) 是一个 7 x 24 小时全自动**循环多平台同时推流**直播工具。
项目架构流程如下:
@@ -140,7 +140,18 @@ pip install -r requirements.txt
#### 3. 配置 whisper 模型及 MLLM 模型
##### 3.1 whisper 模型(字幕识别)
##### 3.1 whisper 语音识别
`ASR_METHOD` 默认为 none, 即不进行语音字幕识别。
##### 3.1.1 采用 api 方式
`src/config.py` 文件中的 `ASR_METHOD` 参数设置为 `api`,然后填写 `WHISPER_API_KEY` 参数为你的 [API Key](https://console.groq.com/keys)。本项目采用 groq 提供 free tier 的 `whisper-large-v3-turbo` 模型,上传限制为 40 MB约半小时因此如需采用 api 识别的方式,请将视频录制分段调整为 30 分钟。此外free tier 请求限制为 7200秒/20次/小时28800秒/2000次/天。如果有更多需求,也欢迎升级到 dev tier更多信息见[groq 官网](https://console.groq.com/docs/rate-limits)。
##### 3.1.2 采用本地部署方式
`src/config.py` 文件中的 `ASR_METHOD` 参数设置为 `deploy`,然后下载所需模型文件,并放置在 `src/subtitle/models` 文件夹中。
项目默认采用 [`small`](https://openaipublic.azureedge.net/main/whisper/models/9ecf779972d90ba49c06d968637d720dd632c55bbf19d441fb42bf17a411e794/small.pt) 模型,请点击下载所需文件,并放置在 `src/subtitle/models` 文件夹中。
> [!TIP]

View File

@@ -14,8 +14,9 @@ def render_command(in_video_path, out_video_path, in_subtitle_font_size, in_subt
in_subtitle_margin_v: str, the bottom margin of subtitles
"""
in_ass_path = in_video_path[:-4] + '.ass'
if GPU_EXIST:
in_srt_path = in_video_path[:-4] + '.srt'
in_srt_path = in_video_path[:-4] + '.srt'
if GPU_EXIST and os.path.isfile(in_srt_path):
if os.path.isfile(in_ass_path):
scan_log.info("Current Mode: GPU with danmaku")
command = [

View File

@@ -2,7 +2,7 @@
import queue
import time
from src.subtitle.generate_subtitles import generate_subtitles
from src.subtitle.subtitle_generator import generate_subtitle
from src.burn.render_video import render_video
from src.log.logger import scan_log
@@ -11,7 +11,7 @@ class VideoRenderQueue:
self.render_queue = queue.Queue()
def pipeline_render(self, video_path):
generate_subtitles(video_path)
generate_subtitle(video_path)
self.render_queue.put(video_path)
def monitor_queue(self):

View File

@@ -5,7 +5,7 @@ import os
import subprocess
from src.config import GPU_EXIST, SRC_DIR, VIDEOS_DIR
from src.danmaku.generate_danmakus import get_resolution, process_danmakus
from src.subtitle.generate_subtitles import generate_subtitles
from src.subtitle.subtitle_generator import generate_subtitle
from src.burn.render_command import render_command
from src.upload.extract_video_info import get_video_info
from src.log.logger import scan_log
@@ -70,8 +70,7 @@ def render_then_merge(video_path_list):
# Process the danmakus to ass and remove emojis
subtitle_font_size, subtitle_margin_v = process_danmakus(xml_path, video_resolution)
# Generate the srt file via whisper model
if GPU_EXIST:
generate_subtitles(original_video_path)
generate_subtitle(original_video_path)
# Burn danmaku or subtitles into the videos
render_command(original_video_path, video_to_be_merged, subtitle_font_size, subtitle_margin_v)
if not os.path.exists(merge_list):

View File

@@ -5,7 +5,7 @@ import os
import subprocess
from src.config import GPU_EXIST, SRC_DIR, MODEL_TYPE, AUTO_SLICE, SLICE_DURATION, MIN_VIDEO_SIZE, VIDEOS_DIR , SLICE_NUM, SLICE_OVERLAP, SLICE_STEP
from src.danmaku.generate_danmakus import get_resolution, process_danmakus
from src.subtitle.generate_subtitles import generate_subtitles
from src.subtitle.subtitle_generator import generate_subtitle
from src.burn.render_command import render_command
from autoslice import slice_video_by_danmaku
from src.autoslice.inject_metadata import inject_metadata
@@ -52,9 +52,8 @@ def render_video(video_path):
scan_log.error(f"FileNotFoundError: {e} - Check if the file exists")
# Generate the srt file via whisper model
if GPU_EXIST:
if MODEL_TYPE != "pipeline":
generate_subtitles(original_video_path)
if MODEL_TYPE != "pipeline":
generate_subtitle(original_video_path)
# Burn danmaku or subtitles into the videos
render_command(original_video_path, format_video_path, subtitle_font_size, subtitle_margin_v)

View File

@@ -10,7 +10,13 @@ from db.conn import create_table
GPU_EXIST=True
# Can be pipeline, append, merge
MODEL_TYPE = "append"
Inference_Model = "small"
# =============== The auto speech recognition configuration ============================
ASR_METHOD = "api" # can be "deploy" or "api" or "none"
# If you choose "api", due to the limitation of free tier, you should keep every video less than 30 minutes(around)
# Apply for your own API key at https://console.groq.com/keys
WHISPER_API_KEY = ""
Inference_Model = "small" # the model to be deployed
# =============== The video configuration ============================
TITLE = "{artist}直播回放-{date}-{title}"
# You can change the title as you like, eg.
# f"{artist}直播回放-{date}-{title}" - Streamer直播回放-20250328-Live title

View File

@@ -0,0 +1,89 @@
import os
import json
import re
import subprocess
from groq import Groq
from src.config import WHISPER_API_KEY
def seconds_to_srt_time(seconds):
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
millis = int((seconds - int(seconds)) * 1000)
return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"
def write_to_srt(segments, output_file):
with open(output_file, 'w', encoding='utf-8') as f:
for i, segment in enumerate(segments, start=1):
start_time = seconds_to_srt_time(segment['start'])
end_time = seconds_to_srt_time(segment['end'])
text = segment['text']
# filter out the illusion
if "请不吝" in text:
text = ""
f.write(f"{i}\n")
f.write(f"{start_time} --> {end_time}\n")
f.write(f"{text}\n\n")
def print_segment_info(segments):
if segments:
for segment in segments:
start_time = segment.get('start')
end_time = segment.get('end')
text = segment.get('text')
print(f"Start time: {start_time} seconds, End time: {end_time} seconds, Text: {text}")
else:
print("No valid segments data found.")
def check_file_format(filename):
if filename[-4:] != ".mp3":
mp3filename = filename[:-4] + ".mp3"
command = [
'ffmpeg', '-i', filename, '-vn', '-acodec', 'libmp3lame', mp3filename
]
subprocess.run(command, check=True, capture_output=True, text=True)
return mp3filename
else:
return filename
# Groq API SDK: https://console.groq.com/docs/speech-to-text
# due to the limit of API, 40 MB (free tier), 100MB (dev tier)
# Requests per minute: 20, per day: 2000. And 7200 seconds / hour, 28800 seconds / day.
# more info: https://console.groq.com/docs/rate-limits
def generate_srt(filename, output_file=None):
client = Groq(
api_key=WHISPER_API_KEY
)
filename = check_file_format(filename)
if output_file is None:
output_file = filename[:-4] + ".srt"
try:
with open(filename, "rb") as file:
transcription = client.audio.transcriptions.create(
file=file, # Required audio file
model="whisper-large-v3-turbo", # Required model to use for transcription
prompt="以下是普通话的句子", # Optional
response_format="verbose_json", # Optional
timestamp_granularities = ["segment"], # Optional (must set response_format to "json" to use and can specify "word", "segment" (default), or both)
# language="zh", # Optional
temperature=0.0 # Optional
)
input_str = json.dumps(transcription, indent=2, default=str)
# use index to segment the input_str
start_index = input_str.find('segments=') + len('segments=')
end_index = input_str.rfind(']') + 1
segments_str = input_str[start_index:end_index]
segments = json.loads(segments_str.replace("'", "\""))
# print_segment_info(segments)
write_to_srt(segments, output_file)
# remove the audio file
os.remove(filename)
return output_file
except Exception as e:
print(f"Error: {e}")
return None
if __name__ == "__main__":
filename = ""
generate_srt(filename)

View File

@@ -1,20 +0,0 @@
# Copyright (c) 2024 bilive.
import os
import subprocess
from config import SRC_DIR
from log.logger import scan_log
# Generate the srt file via whisper model
def generate_subtitles(in_video_path):
"""Generate subtitles via whisper model
Args:
in_video_path: str, the path of video
"""
try:
subprocess.run(
['python', os.path.join(SRC_DIR, 'subtitle', 'generate.py'), in_video_path],
stdout=subprocess.DEVNULL
)
except subprocess.CalledProcessError as e:
scan_log.error(f"Generate subtitles failed: {e.stderr}")

View File

@@ -0,0 +1,49 @@
# Copyright (c) 2024 bilive.
import os
import subprocess
from config import SRC_DIR, ASR_METHOD, WHISPER_API_KEY
from log.logger import scan_log
from functools import wraps
def subtitle_generator(asr_method):
"""Decorator to select subtitle generation function based on model type
Args:
model_type: str, type of model to use
Returns:
function: wrapped subtitle generation function
"""
def decorator(func):
def wrapper(video_path):
if asr_method == "api":
from .api.whisper_sdk import generate_srt
return generate_srt(video_path)
elif asr_method == "deploy":
try:
subprocess.run(
['python', os.path.join(SRC_DIR, 'subtitle', 'generate.py'), video_path],
stdout=subprocess.DEVNULL
)
return video_path[:-4] + ".srt"
except subprocess.CalledProcessError as e:
scan_log.error(f"Generate subtitles failed: {e.stderr}")
return None
elif asr_method == "none":
return None
else:
scan_log.error(f"Unsupported asr method: {asr_method}")
return None
return wrapper
return decorator
# Generate the srt file via whisper model
@subtitle_generator(ASR_METHOD)
def generate_subtitle(in_video_path):
"""Generate subtitles via whisper model
Args:
in_video_path: str, the path of video
"""
pass

View File

@@ -20,20 +20,24 @@ def generate_video_data(video_path):
return copyright, title, desc, tid, tag, source, cover, dynamic
def generate_slice_data(video_path):
command = [
"ffprobe",
"-v", "quiet",
"-print_format", "json",
"-show_format",
video_path
]
output = subprocess.check_output(command, stderr=subprocess.STDOUT).decode('utf-8')
parsed_output = json.loads(output)
title = parsed_output["format"]["tags"]["generate"]
copyright = 1
tid = 138
tag = "直播切片"
return copyright, title, tid, tag
try:
command = [
"ffprobe",
"-v", "quiet",
"-print_format", "json",
"-show_format",
video_path
]
output = subprocess.check_output(command, stderr=subprocess.STDOUT).decode('utf-8')
parsed_output = json.loads(output)
title = parsed_output["format"]["tags"]["generate"]
copyright = 1
tid = 138
tag = "直播切片"
return copyright, title, tid, tag
except Exception as e:
scan_log.error(f"Error in generate_slice_data: {e}")
return None, None, None, None
if __name__ == "__main__":
pass

View File

@@ -21,6 +21,10 @@ def upload_video(upload_path):
try:
if upload_path.endswith('.flv'):
copyright, title, tid, tag = generate_slice_data(upload_path)
if title is None:
upload_log.error("Fail to upload slice video, the files will be reserved.")
update_upload_queue_lock(upload_path, 0)
return False
else:
copyright, title, desc, tid, tag, source, cover, dynamic = generate_video_data(upload_path)
yaml = ""