feat: 克隆音频完成能跑通

main
guofei 2024-12-03 14:58:40 +08:00
parent cfb38ccf8b
commit 1376f9375e
6 changed files with 501 additions and 150 deletions

View File

@ -27,3 +27,12 @@ pm2 start --name admin-banban-new-nest npm -- run start:prod
StatusCode: 1114,
StatusMessage: 'snr check failed, snr: 0.33, threshold: 5.00'
}, -->
const result = [
{
roleId: '6704bd0ef48326fe51ddb751',
roleName: '甘宁',
url: 'https://banban-systemcharter-speak.oss-cn-beijing.aliyuncs.com/test/WeChat_20241119150807_1.mp3',
},
];

View File

@ -32,6 +32,9 @@ model SystemCharter {
// 音色名称
voiceName String?
// 剩余可克隆次数
remainingCloneCount Int @default(10)
// 原始音频
originAudioUrl String?
// 克隆音频
@ -45,6 +48,10 @@ model TaskQueue {
id String @id @default(auto()) @map("_id") @db.ObjectId
// 任务类型
type String
// 任务名称
roleName String?
// 角色id
roleId String?
// 任务数据
data String
// 任务状态: pending/processing/completed/failed
@ -53,8 +60,21 @@ model TaskQueue {
error String?
// 重试次数
attempts Int @default(0)
// 最大重试次数
// 最大重试次数 (不需要这个逻辑了)
maxAttempts Int @default(3)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
}
model CloneHistory {
id String @id @default(auto()) @map("_id") @db.ObjectId
roleId String // 角色ID
roleName String // 角色名称
cloneUrl String // 克隆URL
speakerId String? // 克隆后的speakerId
status String // 任务状态pending, processing, success, failed
error String? // 错误信息
taskId String? // 关联的任务ID
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
}

View File

@ -12,10 +12,11 @@ export class RedisTaskService {
// 添加任务
async addTask(data: any) {
// 先记录到数据库
const taskRecord = await this.dbService.taskQueue.create({
data: {
type: 'BATCH_CLONE_AUDIO',
roleId: data.roleId,
roleName: data.roleName,
data: JSON.stringify(data),
status: 'pending',
},
@ -46,49 +47,82 @@ export class RedisTaskService {
});
}
// 更新任务状态
async updateTaskStatus(taskId: string, status: string, error?: string) {
return await this.dbService.taskQueue.update({
where: { id: taskId },
data: {
status,
error,
attempts: { increment: 1 },
},
});
// 更新任务状态 任务状态pending, processing, success, failed
async updateTaskStatus(taskId: string, status: string, error?: string, speakerId?: string) {
try {
// 更新任务状态
const task = await this.dbService.taskQueue.update({
where: { id: taskId },
data: {
status,
error,
// 只有重新添加的任务才需要增加重试次数
attempts: { increment: status === 'processing' ? 1 : 0 },
},
});
const taskData = JSON.parse(task.data);
// 如果任务成功,减少剩余克隆次数
if (status === 'completed' && speakerId) {
// 减少剩余克隆次数
await this.dbService.systemCharter.update({
where: { id: taskData.roleId },
data: {
remainingCloneCount: {
decrement: 1,
},
},
});
}
return task;
} catch (error) {
console.error('更新任务状态失败:', error);
throw error;
}
}
// 重试失败的任务
async retryTask(taskId: string) {
const task = await this.dbService.taskQueue.findUnique({
where: { id: taskId },
});
if (task && task.status === 'failed') {
// 重置任务状态
await this.dbService.taskQueue.update({
try {
const task = await this.dbService.taskQueue.findUnique({
where: { id: taskId },
data: {
status: 'pending',
attempts: 0,
error: null,
},
});
// 重新加入队列
await this.taskQueue.add(
'BATCH_CLONE_AUDIO',
{
...JSON.parse(task.data),
taskId: task.id,
},
{
removeOnComplete: true,
attempts: 3,
},
);
if (!task || task.status !== 'failed') {
throw new Error('任务不存在或状态不是失败状态');
}
await this.dbService.$transaction(async (prisma) => {
// 更新任务状态为待处理
await prisma.taskQueue.update({
where: { id: taskId },
data: {
status: 'pending',
error: null,
},
});
// 重新加入队列处理
const taskData = JSON.parse(task.data);
await this.taskQueue.add(
'BATCH_CLONE_AUDIO',
{
...taskData,
taskId: task.id,
},
{
removeOnComplete: true,
attempts: 3,
},
);
});
return true;
} catch (error) {
console.error('重试任务失败:', error);
throw error;
}
return false;
}
}

View File

@ -27,38 +27,43 @@ export class RedisTaskProcessor {
const volcenAudioSpeakService = new VolcenAudioSpeakService();
// voiceId = await volcenAudioSpeakService.getVoiceId();
// const voiceId = 'S_FC60x0Gb1';
const voiceId = 'S_VK2Yw0Gb1';
// 剩余1次
// const voiceId = 'S_VK2Yw0Gb1';
voiceId = 'S_5QKWw0Gb1';
// 请求ossurl获取base64
const base64 = await volcenAudioSpeakService.getAudioBase64(ossUrl);
// 克隆音频
volcenAudioSpeakService.speakClone(voiceId, base64);
// // 5秒后激活
setTimeout(async () => {
if (taskId) {
await this.redisTaskService.updateTaskStatus(taskId, 'completed');
// 激活音频 (激活后就不能克隆了,所以要克隆后确定后再激活)
// await volcenAudioSpeakService.speakActivate(voiceId);
const textToSpeechResult = await volcenAudioSpeakService.textToSpeech(
'今天有什么新鲜事吗?快给我讲讲',
voiceId,
);
// 修改系统角色的 voiceId
await this.dbService.systemCharter.update({
where: { id: charterInfo.roleId },
data: {
voiceId,
voiceName: charterInfo.roleName,
// 是否激活
activate: true,
originAudioUrl: ossUrl,
cloneAfterAudioUrl: textToSpeechResult.url,
},
});
}
console.log(`任务处理完成: ${job.data.roleName}`);
}, 1000 * 5);
await volcenAudioSpeakService.speakClone(voiceId, base64);
await this.redisTaskService.updateTaskStatus(taskId, 'completed');
const textToSpeechResult = await volcenAudioSpeakService.textToSpeech('今天有什么新鲜事吗?快给我讲讲', voiceId);
// 修改系统角色的 voiceId
await this.dbService.systemCharter.update({
where: { id: charterInfo.roleId },
data: {
voiceId,
voiceName: charterInfo.roleName,
originAudioUrl: ossUrl,
cloneAfterAudioUrl: textToSpeechResult.url,
remainingCloneCount: {
// 剩余可克隆次数减1
decrement: 1,
},
},
});
// 增加克隆人物音色记录
await this.dbService.cloneHistory.create({
data: {
roleId: charterInfo.roleId,
roleName: charterInfo.roleName,
cloneUrl: textToSpeechResult.url,
speakerId: voiceId,
status: 'success',
taskId,
},
});
console.log(`任务处理完成: ${job.data.roleName}`);
} catch (error) {
console.log(error.data);
console.log('error', error);
console.log('voiceId=--------------', voiceId);
console.error(`任务处理失败 [第${job.attemptsMade + 1}次尝试]: ${error.message}`);
if (job.data.taskId) {

View File

@ -1,11 +1,22 @@
import { Body, Controller, Get, Post, Query } from '@nestjs/common';
import { Body, Controller, Get, Param, Post, Query, HttpException, HttpStatus, Delete } from '@nestjs/common';
import { SystemCharterlDto } from './dto/SystemCharter.dto';
import { Pagination } from 'src/common/pagination';
import { DBService } from 'src/utils/db/DB.service';
import { ApiResponse } from 'src/utils/response/response';
import { RedisTaskService } from 'src/common/RedisTask/RedisTask.service';
import { CloneSpeakDto } from './dto/CloneSpeakDto.dto';
import { VolcenAudioSpeakService } from 'src/services/VolcenAudioSpeakService';
import axios from 'axios';
import { CloneSpeakDto } from './dto/CloneSpeakDto.dto';
// 添加任务查询DTO
export class TaskQueryDto {
current?: number;
pageSize?: number;
roleId?: string;
roleName?: string;
startTime?: string;
endTime?: string;
}
@Controller('/system/charter')
export class SystemCharterController {
@ -28,6 +39,18 @@ export class SystemCharterController {
orderBy: {
createdAt: 'desc',
},
select: {
id: true,
roleName: true,
remainingCloneCount: true,
bg: true,
voiceId: true,
voiceName: true,
roleSetting: true,
originAudioUrl: true,
cloneAfterAudioUrl: true,
activate: true,
},
});
return ApiResponse.success({
data: record,
@ -39,71 +62,331 @@ export class SystemCharterController {
@Post('/add')
async addSystemCharter(@Body() object: SystemCharterlDto) {
delete object.id;
const newRecord = await this.dbService.systemCharter.create({
data: object,
});
return newRecord;
try {
delete object.id;
// 添加默认的剩余可克隆次数
const newRecord = await this.dbService.systemCharter.create({
data: {
...object,
remainingCloneCount: 10, // 默认10次
},
});
return ApiResponse.successToMessage('添加成功', newRecord);
} catch (error) {
console.error('添加角色失败:', error);
return ApiResponse.failToMessage('添加失败:' + error.message);
}
}
@Post('/update')
async updateSystemCharter(@Body() object: SystemCharterlDto) {
const { id, ...other } = object;
const newRecord = await this.dbService.systemCharter.update({
where: {
id: id,
},
data: other,
});
return newRecord;
try {
const { id, ...other } = object;
const newRecord = await this.dbService.systemCharter.update({
where: {
id: id,
},
data: other,
});
return ApiResponse.successToMessage('更新成功', newRecord);
} catch (error) {
console.error('更新角色失败:', error);
return ApiResponse.failToMessage('更新失败:' + error.message);
}
}
@Get('cloneSpeaker')
async batchCloneAudio(@Body() object: CloneSpeakDto) {
// try {
// if (!object.roleId || !object.roleName || !object.url) {
// return ApiResponse.failToMessage('参数错误');
// }
// // 验证音频URL是否可访问
// const response = await axios.head(object.url);
// if (response.status !== 200) {
// return ApiResponse.failToMessage('音频文件无法访问');
// }
// // 验证文件类型
// const contentType = response.headers['content-type'];
// if (!contentType.includes('audio')) {
// return ApiResponse.failToMessage('文件类型必须是音频');
// }
// this.redisTaskService.addTask(object);
// return ApiResponse.success(null, '任务添加成功');
// } catch (error) {
// console.error('添加克隆任务失败:', error);
// return ApiResponse.failToMessage('添加任务失败:' + error.message);
// }
@Get('taskList')
async getTaskList(@Query() query: TaskQueryDto) {
try {
const { current, pageSize, roleId, roleName } = query;
// 构建查询条件
const where: any = {
// 只查询非成功状态的任务
status: {
not: 'completed',
},
};
if (roleId) {
where.roleId = roleId;
}
if (roleName) {
where.roleName = {
contains: roleName,
};
}
// 查询数据
const [tasks, total] = await Promise.all([
this.dbService.taskQueue.findMany({
where,
...Pagination.getPage(current, pageSize),
orderBy: {
createdAt: 'desc',
},
}),
this.dbService.taskQueue.count({ where }),
]);
return ApiResponse.success({
data: tasks,
meta: {
total,
current: current || 1,
pageSize: pageSize || 10,
},
});
} catch (error) {
console.error('获取任务列表失败:', error);
return ApiResponse.failToMessage('获取任务列表失败:' + error.message);
}
}
@Get('test')
async batchCloneAudio1() {
const result = [
{
roleId: '6704bd0ef48326fe51ddb751',
roleName: '甘宁',
// url: 'https://banban-systemcharter-speak.oss-cn-beijing.aliyuncs.com/test/%E4%BB%A3%E5%8F%B7%E9%B8%A2%E5%AF%86%E6%8E%A2%E5%91%A8%E7%91%9C.mp3',
// url: 'https://banban-systemcharter-speak.oss-cn-beijing.aliyuncs.com/test/%E4%BB%A3%E5%8F%B7%E9%B8%A2%E5%AF%86%E6%8E%A2%E7%94%98%E5%AE%81.mp3',
// url: 'https://banban-systemcharter-speak.oss-cn-beijing.aliyuncs.com/test/%E4%BB%A3%E5%8F%B7%E9%B8%A2%E5%AF%86%E6%8E%A2%E7%94%98%E5%AE%81%20-%20%E5%89%AF%E6%9C%AC.wav',
// 郭德纲
// url: 'https://banban-systemcharter-speak.oss-cn-beijing.aliyuncs.com/test/%E9%83%AD%E5%BE%B7%E7%BA%B2-%E5%A3%B0%E9%9F%B3%E5%85%8B%E9%9A%86.mp3',
// 增以后的
url: 'https://banban-systemcharter-speak.oss-cn-beijing.aliyuncs.com/test/WeChat_20241119150807_1.mp3',
@Get('activateVoice')
async activateVoice(@Query('speakerId') speakerId: string) {
try {
if (!speakerId) {
return ApiResponse.failToMessage('参数错误缺少speakerId');
}
// 周瑜
// url: 'https://banban-systemcharter-speak.os-cn-beijing.aliyuncs.com/test/%E4%BB%A3%E5%8F%B7%E9%B8%A2%E5%AF%86%E6%8E%A2%E5%91%A8%E7%91%9C.mp3',
// 周瑜 提高分贝后
// url: 'https://banban-systemcharter-speak.oss-cn-beijing.aliyuncs.com/test/%E5%91%A8%E7%91%9C.mp3',
},
];
result.forEach((item) => {
this.redisTaskService.addTask(item);
});
const volcenAudioSpeakService = new VolcenAudioSpeakService();
const result = await volcenAudioSpeakService.speakActivate(speakerId);
return ApiResponse.successToMessage('音频激活成功', result);
} catch (error) {
console.error('激活音频失败:', error);
return ApiResponse.failToMessage('激活音频失败:' + error.message);
}
}
// 添加获取克隆历史记录的接口
@Get('cloneHistory/:id')
async getCloneHistory(@Param('id') id: string) {
try {
// 查询数据
const [histories, total] = await Promise.all([
this.dbService.cloneHistory.findMany({
where: {
roleId: id,
},
orderBy: {
createdAt: 'desc',
},
}),
this.dbService.cloneHistory.count({ where: { roleId: id } }),
]);
return ApiResponse.successToMessage('获取成功', {
data: histories,
meta: {
total,
},
});
} catch (error) {
console.error('获取克隆历史失败:', error);
return ApiResponse.failToMessage('获取克隆历史失败:' + error.message);
}
}
@Get('/detail/:id')
async getDetail(@Param('id') id: string) {
try {
if (!id) {
return ApiResponse.failToMessage('参数错误缺少id');
}
const record = await this.dbService.systemCharter.findUnique({
where: {
id: id,
},
});
if (!record) {
return ApiResponse.failToMessage('人设不存在');
}
return ApiResponse.successToMessage('获取成功', record);
} catch (error) {
console.error('获取人设详情失败:', error);
return ApiResponse.failToMessage('获取人设详情失败:' + error.message);
}
}
/**
*
* @param cloneSpeakDto
* @returns
*/
@Post('cloneSpeaker')
async cloneSpeaker(@Body() cloneSpeakDto: CloneSpeakDto) {
try {
const { roleId, roleName } = cloneSpeakDto;
// 检查是否存在未完成的相同角色任务
const existingTask = await this.dbService.taskQueue.findFirst({
where: {
roleId,
status: {
// pending / processing /completed/failed
in: ['pending', 'processing'],
},
},
});
if (existingTask) {
throw new HttpException(
{
message: '该角色已有正在进行的克隆任务,请等待任务完成后再试',
taskId: existingTask.id,
},
HttpStatus.BAD_REQUEST,
);
}
// 检查剩余克隆次数
const charter = await this.dbService.systemCharter.findUnique({
where: { id: roleId },
});
if (!charter) {
throw new HttpException('角色不存在', HttpStatus.BAD_REQUEST);
}
if (charter.remainingCloneCount <= 0) {
throw new HttpException('剩余克隆次数不足', HttpStatus.BAD_REQUEST);
}
// 添加任务到队列
await this.redisTaskService.addTask({
...cloneSpeakDto,
roleId,
roleName,
});
return {
message: '克隆任务已添加到队列',
};
} catch (error) {
throw error;
}
}
// 重试失败的任务
@Post('retryTask/:taskId')
async retryTask(@Param('taskId') taskId: string) {
try {
if (!taskId) {
return ApiResponse.failToMessage('参数错误缺少taskId');
}
await this.redisTaskService.retryTask(taskId);
return ApiResponse.successToMessage('重试任务添加成功');
} catch (error) {
console.error('重试任务失败:', error);
return ApiResponse.failToMessage('重试任务失败:' + error.message);
}
}
/**
*
* @param roleId
*/
@Get('checkCloneTask/:roleId')
async checkCloneTask(@Param('roleId') roleId: string) {
try {
const charter = await this.dbService.systemCharter.findUnique({
where: { id: roleId },
});
if (!charter) {
throw new HttpException('角色不存在', HttpStatus.BAD_REQUEST);
}
// 检查是否存在未完成的相同角色任务
const existingTask = await this.dbService.taskQueue.findFirst({
where: {
roleId,
status: {
in: ['pending', 'processing'],
},
},
});
if (existingTask) {
throw new HttpException('该角色已有正在进行的克隆任务,请等待任务完成后再试', HttpStatus.BAD_REQUEST);
}
// 检查是否存在失败的任务
const failedTask = await this.dbService.taskQueue.findFirst({
where: {
roleId,
status: 'failed',
},
});
// 处理失败任务的错误信息
if (failedTask) {
throw new HttpException('该角色存在失败的克隆任务,请先删除失败任务后再试', HttpStatus.BAD_REQUEST);
}
return ApiResponse.success();
} catch (error) {
throw error;
}
}
// 删除任务
@Delete('deleteTask/:taskId')
async deleteTask(@Param('taskId') taskId: string) {
try {
const task = await this.dbService.taskQueue.findUnique({
where: { id: taskId },
select: {
id: true,
status: true,
roleId: true,
roleName: true,
},
});
if (!task) {
throw new HttpException(
{
message: '任务不存在',
code: 'TASK_NOT_FOUND',
},
HttpStatus.NOT_FOUND,
);
}
if (task.status !== 'failed') {
throw new HttpException(
{
message: '只能删除失败状态的任务',
code: 'INVALID_TASK_STATUS',
currentStatus: task.status,
},
HttpStatus.BAD_REQUEST,
);
}
// 删除任务队列中的记录
await this.dbService.taskQueue.delete({
where: { id: taskId },
});
return ApiResponse.successToMessage('任务删除成功');
} catch (error) {
if (error instanceof HttpException) {
throw error;
}
throw new HttpException(
{
message: '删除任务失败',
error: error.message,
},
HttpStatus.INTERNAL_SERVER_ERROR,
);
}
}
}

View File

@ -89,7 +89,7 @@ export class VolcenAudioSpeakService {
speakerId: string,
base64: string | Buffer,
): Promise<T> {
return new Promise(async (resolve, reject) => {
try {
const tiktokBody = {
speaker_id: speakerId,
appid: VolcenAudioSpeakService.TiktokAppId,
@ -97,41 +97,41 @@ export class VolcenAudioSpeakService {
{
audio_bytes: base64,
audio_format: 'wav',
// 添加音频参数要求
// sample_rate: 16000, // 采样率必须是16kHz
// channels: 1, // 单声道
// bits: 16, // 16位深度
sample_rate: 16000,
channels: 1,
bits: 16,
},
],
source: 2,
};
// 开始克隆声音,参数: { speaker_id: 'S_FC60x0Gb1', appid: '8167092294', audio_format: 'wav' }
console.log('开始克隆声音,参数:', {
speaker_id: speakerId,
appid: VolcenAudioSpeakService.TiktokAppId,
audio_format: 'wav',
});
axios
.post(VolcenAudioSpeakService.SPEAK_CLONE_API, tiktokBody, {
headers: {
'Resource-Id': 'volc.megatts.voiceclone',
Authorization: `Bearer;${VolcenAudioSpeakService.TiktokAccessToken}`,
'Content-Type': 'application/json',
},
})
.then((result) => {
// 检查业务状态码
if (result.data?.BaseResp?.StatusCode !== 0) {
console.error('克隆声音业务错误:', result.data);
throw new Error(`训练声音业务错误: ${result.data?.BaseResp?.StatusMessage || '未知错误'}`);
}
resolve(result.data as T);
})
.catch((error) => {
reject(error);
});
});
const result = await axios.post(VolcenAudioSpeakService.SPEAK_CLONE_API, tiktokBody, {
headers: {
'Resource-Id': 'volc.megatts.voiceclone',
Authorization: `Bearer;${VolcenAudioSpeakService.TiktokAccessToken}`,
'Content-Type': 'application/json',
},
});
console.log('result.status', result.status);
// 检查业务状态码
if (result.data?.BaseResp?.StatusCode !== 0) {
console.error('克隆声音业务错误:', result.data);
throw new Error(`训练声音业务错误: ${result.data?.BaseResp?.StatusMessage || '未知错误'}`);
}
return result.data as T;
} catch (error) {
const responseResult = error.response.data;
throw new Error(JSON.stringify(responseResult));
}
}
/**