From 1376f9375efef9e047f73fed0003462713f8c1fa Mon Sep 17 00:00:00 2001 From: guofei Date: Tue, 3 Dec 2024 14:58:40 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=85=8B=E9=9A=86=E9=9F=B3=E9=A2=91?= =?UTF-8?q?=E5=AE=8C=E6=88=90=E8=83=BD=E8=B7=91=E9=80=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 9 + prisma/schema.prisma | 22 +- src/common/RedisTask/RedisTask.service.ts | 106 +++-- .../RedisTask/RedisTaskProcessor.service.ts | 61 +-- .../SystemCharter/SystemCharter.controller.ts | 401 +++++++++++++++--- src/services/VolcenAudioSpeakService.ts | 52 +-- 6 files changed, 501 insertions(+), 150 deletions(-) diff --git a/README.md b/README.md index a6c4949..c11e2a3 100644 --- a/README.md +++ b/README.md @@ -27,3 +27,12 @@ pm2 start --name admin-banban-new-nest npm -- run start:prod StatusCode: 1114, StatusMessage: 'snr check failed, snr: 0.33, threshold: 5.00' }, --> + + + const result = [ + { + roleId: '6704bd0ef48326fe51ddb751', + roleName: '甘宁', + url: 'https://banban-systemcharter-speak.oss-cn-beijing.aliyuncs.com/test/WeChat_20241119150807_1.mp3', + }, + ]; \ No newline at end of file diff --git a/prisma/schema.prisma b/prisma/schema.prisma index ad7bfca..d76d01a 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -32,6 +32,9 @@ model SystemCharter { // 音色名称 voiceName String? + // 剩余可克隆次数 + remainingCloneCount Int @default(10) + // 原始音频 originAudioUrl String? // 克隆音频 @@ -45,6 +48,10 @@ model TaskQueue { id String @id @default(auto()) @map("_id") @db.ObjectId // 任务类型 type String + // 任务名称 + roleName String? + // 角色id + roleId String? // 任务数据 data String // 任务状态: pending/processing/completed/failed @@ -53,8 +60,21 @@ model TaskQueue { error String? // 重试次数 attempts Int @default(0) - // 最大重试次数 + // 最大重试次数 (不需要这个逻辑了) maxAttempts Int @default(3) createdAt DateTime @default(now()) updatedAt DateTime @updatedAt } + +model CloneHistory { + id String @id @default(auto()) @map("_id") @db.ObjectId + roleId String // 角色ID + roleName String // 角色名称 + cloneUrl String // 克隆URL + speakerId String? // 克隆后的speakerId + status String // 任务状态:pending, processing, success, failed + error String? // 错误信息 + taskId String? // 关联的任务ID + createdAt DateTime @default(now()) + updatedAt DateTime @updatedAt +} diff --git a/src/common/RedisTask/RedisTask.service.ts b/src/common/RedisTask/RedisTask.service.ts index 8ba388f..920fec6 100644 --- a/src/common/RedisTask/RedisTask.service.ts +++ b/src/common/RedisTask/RedisTask.service.ts @@ -12,10 +12,11 @@ export class RedisTaskService { // 添加任务 async addTask(data: any) { - // 先记录到数据库 const taskRecord = await this.dbService.taskQueue.create({ data: { type: 'BATCH_CLONE_AUDIO', + roleId: data.roleId, + roleName: data.roleName, data: JSON.stringify(data), status: 'pending', }, @@ -46,49 +47,82 @@ export class RedisTaskService { }); } - // 更新任务状态 - async updateTaskStatus(taskId: string, status: string, error?: string) { - return await this.dbService.taskQueue.update({ - where: { id: taskId }, - data: { - status, - error, - attempts: { increment: 1 }, - }, - }); + // 更新任务状态 任务状态:pending, processing, success, failed + async updateTaskStatus(taskId: string, status: string, error?: string, speakerId?: string) { + try { + // 更新任务状态 + const task = await this.dbService.taskQueue.update({ + where: { id: taskId }, + data: { + status, + error, + // 只有重新添加的任务才需要增加重试次数 + attempts: { increment: status === 'processing' ? 1 : 0 }, + }, + }); + + const taskData = JSON.parse(task.data); + + // 如果任务成功,减少剩余克隆次数 + if (status === 'completed' && speakerId) { + // 减少剩余克隆次数 + await this.dbService.systemCharter.update({ + where: { id: taskData.roleId }, + data: { + remainingCloneCount: { + decrement: 1, + }, + }, + }); + } + + return task; + } catch (error) { + console.error('更新任务状态失败:', error); + throw error; + } } // 重试失败的任务 async retryTask(taskId: string) { - const task = await this.dbService.taskQueue.findUnique({ - where: { id: taskId }, - }); - - if (task && task.status === 'failed') { - // 重置任务状态 - await this.dbService.taskQueue.update({ + try { + const task = await this.dbService.taskQueue.findUnique({ where: { id: taskId }, - data: { - status: 'pending', - attempts: 0, - error: null, - }, }); - // 重新加入队列 - await this.taskQueue.add( - 'BATCH_CLONE_AUDIO', - { - ...JSON.parse(task.data), - taskId: task.id, - }, - { - removeOnComplete: true, - attempts: 3, - }, - ); + if (!task || task.status !== 'failed') { + throw new Error('任务不存在或状态不是失败状态'); + } + + await this.dbService.$transaction(async (prisma) => { + // 更新任务状态为待处理 + await prisma.taskQueue.update({ + where: { id: taskId }, + data: { + status: 'pending', + error: null, + }, + }); + + // 重新加入队列处理 + const taskData = JSON.parse(task.data); + await this.taskQueue.add( + 'BATCH_CLONE_AUDIO', + { + ...taskData, + taskId: task.id, + }, + { + removeOnComplete: true, + attempts: 3, + }, + ); + }); + return true; + } catch (error) { + console.error('重试任务失败:', error); + throw error; } - return false; } } diff --git a/src/common/RedisTask/RedisTaskProcessor.service.ts b/src/common/RedisTask/RedisTaskProcessor.service.ts index 034a171..65fb271 100644 --- a/src/common/RedisTask/RedisTaskProcessor.service.ts +++ b/src/common/RedisTask/RedisTaskProcessor.service.ts @@ -27,38 +27,43 @@ export class RedisTaskProcessor { const volcenAudioSpeakService = new VolcenAudioSpeakService(); // voiceId = await volcenAudioSpeakService.getVoiceId(); // const voiceId = 'S_FC60x0Gb1'; - const voiceId = 'S_VK2Yw0Gb1'; + // 剩余1次 + // const voiceId = 'S_VK2Yw0Gb1'; + voiceId = 'S_5QKWw0Gb1'; // 请求ossurl获取base64 const base64 = await volcenAudioSpeakService.getAudioBase64(ossUrl); // 克隆音频 - volcenAudioSpeakService.speakClone(voiceId, base64); - // // 5秒后激活 - setTimeout(async () => { - if (taskId) { - await this.redisTaskService.updateTaskStatus(taskId, 'completed'); - // 激活音频 (激活后就不能克隆了,所以要克隆后确定后再激活) - // await volcenAudioSpeakService.speakActivate(voiceId); - const textToSpeechResult = await volcenAudioSpeakService.textToSpeech( - '今天有什么新鲜事吗?快给我讲讲', - voiceId, - ); - // 修改系统角色的 voiceId - await this.dbService.systemCharter.update({ - where: { id: charterInfo.roleId }, - data: { - voiceId, - voiceName: charterInfo.roleName, - // 是否激活 - activate: true, - originAudioUrl: ossUrl, - cloneAfterAudioUrl: textToSpeechResult.url, - }, - }); - } - console.log(`任务处理完成: ${job.data.roleName}`); - }, 1000 * 5); + await volcenAudioSpeakService.speakClone(voiceId, base64); + await this.redisTaskService.updateTaskStatus(taskId, 'completed'); + const textToSpeechResult = await volcenAudioSpeakService.textToSpeech('今天有什么新鲜事吗?快给我讲讲', voiceId); + // 修改系统角色的 voiceId + await this.dbService.systemCharter.update({ + where: { id: charterInfo.roleId }, + data: { + voiceId, + voiceName: charterInfo.roleName, + originAudioUrl: ossUrl, + cloneAfterAudioUrl: textToSpeechResult.url, + remainingCloneCount: { + // 剩余可克隆次数减1 + decrement: 1, + }, + }, + }); + // 增加克隆人物音色记录 + await this.dbService.cloneHistory.create({ + data: { + roleId: charterInfo.roleId, + roleName: charterInfo.roleName, + cloneUrl: textToSpeechResult.url, + speakerId: voiceId, + status: 'success', + taskId, + }, + }); + console.log(`任务处理完成: ${job.data.roleName}`); } catch (error) { - console.log(error.data); + console.log('error', error); console.log('voiceId=--------------', voiceId); console.error(`任务处理失败 [第${job.attemptsMade + 1}次尝试]: ${error.message}`); if (job.data.taskId) { diff --git a/src/modules/SystemCharter/SystemCharter.controller.ts b/src/modules/SystemCharter/SystemCharter.controller.ts index d977968..439285c 100644 --- a/src/modules/SystemCharter/SystemCharter.controller.ts +++ b/src/modules/SystemCharter/SystemCharter.controller.ts @@ -1,11 +1,22 @@ -import { Body, Controller, Get, Post, Query } from '@nestjs/common'; +import { Body, Controller, Get, Param, Post, Query, HttpException, HttpStatus, Delete } from '@nestjs/common'; import { SystemCharterlDto } from './dto/SystemCharter.dto'; import { Pagination } from 'src/common/pagination'; import { DBService } from 'src/utils/db/DB.service'; import { ApiResponse } from 'src/utils/response/response'; import { RedisTaskService } from 'src/common/RedisTask/RedisTask.service'; -import { CloneSpeakDto } from './dto/CloneSpeakDto.dto'; +import { VolcenAudioSpeakService } from 'src/services/VolcenAudioSpeakService'; import axios from 'axios'; +import { CloneSpeakDto } from './dto/CloneSpeakDto.dto'; + +// 添加任务查询DTO +export class TaskQueryDto { + current?: number; + pageSize?: number; + roleId?: string; + roleName?: string; + startTime?: string; + endTime?: string; +} @Controller('/system/charter') export class SystemCharterController { @@ -28,6 +39,18 @@ export class SystemCharterController { orderBy: { createdAt: 'desc', }, + select: { + id: true, + roleName: true, + remainingCloneCount: true, + bg: true, + voiceId: true, + voiceName: true, + roleSetting: true, + originAudioUrl: true, + cloneAfterAudioUrl: true, + activate: true, + }, }); return ApiResponse.success({ data: record, @@ -39,71 +62,331 @@ export class SystemCharterController { @Post('/add') async addSystemCharter(@Body() object: SystemCharterlDto) { - delete object.id; - const newRecord = await this.dbService.systemCharter.create({ - data: object, - }); - return newRecord; + try { + delete object.id; + // 添加默认的剩余可克隆次数 + const newRecord = await this.dbService.systemCharter.create({ + data: { + ...object, + remainingCloneCount: 10, // 默认10次 + }, + }); + return ApiResponse.successToMessage('添加成功', newRecord); + } catch (error) { + console.error('添加角色失败:', error); + return ApiResponse.failToMessage('添加失败:' + error.message); + } } @Post('/update') async updateSystemCharter(@Body() object: SystemCharterlDto) { - const { id, ...other } = object; - const newRecord = await this.dbService.systemCharter.update({ - where: { - id: id, - }, - data: other, - }); - return newRecord; + try { + const { id, ...other } = object; + const newRecord = await this.dbService.systemCharter.update({ + where: { + id: id, + }, + data: other, + }); + return ApiResponse.successToMessage('更新成功', newRecord); + } catch (error) { + console.error('更新角色失败:', error); + return ApiResponse.failToMessage('更新失败:' + error.message); + } } - @Get('cloneSpeaker') - async batchCloneAudio(@Body() object: CloneSpeakDto) { - // try { - // if (!object.roleId || !object.roleName || !object.url) { - // return ApiResponse.failToMessage('参数错误'); - // } - // // 验证音频URL是否可访问 - // const response = await axios.head(object.url); - // if (response.status !== 200) { - // return ApiResponse.failToMessage('音频文件无法访问'); - // } - // // 验证文件类型 - // const contentType = response.headers['content-type']; - // if (!contentType.includes('audio')) { - // return ApiResponse.failToMessage('文件类型必须是音频'); - // } - // this.redisTaskService.addTask(object); - // return ApiResponse.success(null, '任务添加成功'); - // } catch (error) { - // console.error('添加克隆任务失败:', error); - // return ApiResponse.failToMessage('添加任务失败:' + error.message); - // } + @Get('taskList') + async getTaskList(@Query() query: TaskQueryDto) { + try { + const { current, pageSize, roleId, roleName } = query; + + // 构建查询条件 + const where: any = { + // 只查询非成功状态的任务 + status: { + not: 'completed', + }, + }; + + if (roleId) { + where.roleId = roleId; + } + + if (roleName) { + where.roleName = { + contains: roleName, + }; + } + + // 查询数据 + const [tasks, total] = await Promise.all([ + this.dbService.taskQueue.findMany({ + where, + ...Pagination.getPage(current, pageSize), + orderBy: { + createdAt: 'desc', + }, + }), + this.dbService.taskQueue.count({ where }), + ]); + + return ApiResponse.success({ + data: tasks, + meta: { + total, + current: current || 1, + pageSize: pageSize || 10, + }, + }); + } catch (error) { + console.error('获取任务列表失败:', error); + return ApiResponse.failToMessage('获取任务列表失败:' + error.message); + } } - @Get('test') - async batchCloneAudio1() { - const result = [ - { - roleId: '6704bd0ef48326fe51ddb751', - roleName: '甘宁', - // url: 'https://banban-systemcharter-speak.oss-cn-beijing.aliyuncs.com/test/%E4%BB%A3%E5%8F%B7%E9%B8%A2%E5%AF%86%E6%8E%A2%E5%91%A8%E7%91%9C.mp3', - // url: 'https://banban-systemcharter-speak.oss-cn-beijing.aliyuncs.com/test/%E4%BB%A3%E5%8F%B7%E9%B8%A2%E5%AF%86%E6%8E%A2%E7%94%98%E5%AE%81.mp3', - // url: 'https://banban-systemcharter-speak.oss-cn-beijing.aliyuncs.com/test/%E4%BB%A3%E5%8F%B7%E9%B8%A2%E5%AF%86%E6%8E%A2%E7%94%98%E5%AE%81%20-%20%E5%89%AF%E6%9C%AC.wav', - // 郭德纲 - // url: 'https://banban-systemcharter-speak.oss-cn-beijing.aliyuncs.com/test/%E9%83%AD%E5%BE%B7%E7%BA%B2-%E5%A3%B0%E9%9F%B3%E5%85%8B%E9%9A%86.mp3', - // 增以后的 - url: 'https://banban-systemcharter-speak.oss-cn-beijing.aliyuncs.com/test/WeChat_20241119150807_1.mp3', + @Get('activateVoice') + async activateVoice(@Query('speakerId') speakerId: string) { + try { + if (!speakerId) { + return ApiResponse.failToMessage('参数错误:缺少speakerId'); + } - // 周瑜 - // url: 'https://banban-systemcharter-speak.os-cn-beijing.aliyuncs.com/test/%E4%BB%A3%E5%8F%B7%E9%B8%A2%E5%AF%86%E6%8E%A2%E5%91%A8%E7%91%9C.mp3', - // 周瑜 提高分贝后 - // url: 'https://banban-systemcharter-speak.oss-cn-beijing.aliyuncs.com/test/%E5%91%A8%E7%91%9C.mp3', - }, - ]; - result.forEach((item) => { - this.redisTaskService.addTask(item); - }); + const volcenAudioSpeakService = new VolcenAudioSpeakService(); + const result = await volcenAudioSpeakService.speakActivate(speakerId); + + return ApiResponse.successToMessage('音频激活成功', result); + } catch (error) { + console.error('激活音频失败:', error); + return ApiResponse.failToMessage('激活音频失败:' + error.message); + } + } + + // 添加获取克隆历史记录的接口 + @Get('cloneHistory/:id') + async getCloneHistory(@Param('id') id: string) { + try { + // 查询数据 + const [histories, total] = await Promise.all([ + this.dbService.cloneHistory.findMany({ + where: { + roleId: id, + }, + orderBy: { + createdAt: 'desc', + }, + }), + this.dbService.cloneHistory.count({ where: { roleId: id } }), + ]); + + return ApiResponse.successToMessage('获取成功', { + data: histories, + meta: { + total, + }, + }); + } catch (error) { + console.error('获取克隆历史失败:', error); + return ApiResponse.failToMessage('获取克隆历史失败:' + error.message); + } + } + + @Get('/detail/:id') + async getDetail(@Param('id') id: string) { + try { + if (!id) { + return ApiResponse.failToMessage('参数错误:缺少id'); + } + + const record = await this.dbService.systemCharter.findUnique({ + where: { + id: id, + }, + }); + + if (!record) { + return ApiResponse.failToMessage('人设不存在'); + } + + return ApiResponse.successToMessage('获取成功', record); + } catch (error) { + console.error('获取人设详情失败:', error); + return ApiResponse.failToMessage('获取人设详情失败:' + error.message); + } + } + + /** + * 克隆声音 + * @param cloneSpeakDto + * @returns + */ + @Post('cloneSpeaker') + async cloneSpeaker(@Body() cloneSpeakDto: CloneSpeakDto) { + try { + const { roleId, roleName } = cloneSpeakDto; + + // 检查是否存在未完成的相同角色任务 + const existingTask = await this.dbService.taskQueue.findFirst({ + where: { + roleId, + status: { + // pending / processing /completed/failed + in: ['pending', 'processing'], + }, + }, + }); + + if (existingTask) { + throw new HttpException( + { + message: '该角色已有正在进行的克隆任务,请等待任务完成后再试', + taskId: existingTask.id, + }, + HttpStatus.BAD_REQUEST, + ); + } + + // 检查剩余克隆次数 + const charter = await this.dbService.systemCharter.findUnique({ + where: { id: roleId }, + }); + + if (!charter) { + throw new HttpException('角色不存在', HttpStatus.BAD_REQUEST); + } + + if (charter.remainingCloneCount <= 0) { + throw new HttpException('剩余克隆次数不足', HttpStatus.BAD_REQUEST); + } + + // 添加任务到队列 + await this.redisTaskService.addTask({ + ...cloneSpeakDto, + roleId, + roleName, + }); + + return { + message: '克隆任务已添加到队列', + }; + } catch (error) { + throw error; + } + } + + // 重试失败的任务 + @Post('retryTask/:taskId') + async retryTask(@Param('taskId') taskId: string) { + try { + if (!taskId) { + return ApiResponse.failToMessage('参数错误:缺少taskId'); + } + + await this.redisTaskService.retryTask(taskId); + return ApiResponse.successToMessage('重试任务添加成功'); + } catch (error) { + console.error('重试任务失败:', error); + return ApiResponse.failToMessage('重试任务失败:' + error.message); + } + } + + /** + * 检查克隆任务状态 + * @param roleId + */ + @Get('checkCloneTask/:roleId') + async checkCloneTask(@Param('roleId') roleId: string) { + try { + const charter = await this.dbService.systemCharter.findUnique({ + where: { id: roleId }, + }); + + if (!charter) { + throw new HttpException('角色不存在', HttpStatus.BAD_REQUEST); + } + + // 检查是否存在未完成的相同角色任务 + const existingTask = await this.dbService.taskQueue.findFirst({ + where: { + roleId, + status: { + in: ['pending', 'processing'], + }, + }, + }); + if (existingTask) { + throw new HttpException('该角色已有正在进行的克隆任务,请等待任务完成后再试', HttpStatus.BAD_REQUEST); + } + + // 检查是否存在失败的任务 + const failedTask = await this.dbService.taskQueue.findFirst({ + where: { + roleId, + status: 'failed', + }, + }); + + // 处理失败任务的错误信息 + if (failedTask) { + throw new HttpException('该角色存在失败的克隆任务,请先删除失败任务后再试', HttpStatus.BAD_REQUEST); + } + return ApiResponse.success(); + } catch (error) { + throw error; + } + } + + // 删除任务 + @Delete('deleteTask/:taskId') + async deleteTask(@Param('taskId') taskId: string) { + try { + const task = await this.dbService.taskQueue.findUnique({ + where: { id: taskId }, + select: { + id: true, + status: true, + roleId: true, + roleName: true, + }, + }); + + if (!task) { + throw new HttpException( + { + message: '任务不存在', + code: 'TASK_NOT_FOUND', + }, + HttpStatus.NOT_FOUND, + ); + } + + if (task.status !== 'failed') { + throw new HttpException( + { + message: '只能删除失败状态的任务', + code: 'INVALID_TASK_STATUS', + currentStatus: task.status, + }, + HttpStatus.BAD_REQUEST, + ); + } + + // 删除任务队列中的记录 + await this.dbService.taskQueue.delete({ + where: { id: taskId }, + }); + + return ApiResponse.successToMessage('任务删除成功'); + } catch (error) { + if (error instanceof HttpException) { + throw error; + } + throw new HttpException( + { + message: '删除任务失败', + error: error.message, + }, + HttpStatus.INTERNAL_SERVER_ERROR, + ); + } } } diff --git a/src/services/VolcenAudioSpeakService.ts b/src/services/VolcenAudioSpeakService.ts index 0a0c22b..706ea3c 100644 --- a/src/services/VolcenAudioSpeakService.ts +++ b/src/services/VolcenAudioSpeakService.ts @@ -89,7 +89,7 @@ export class VolcenAudioSpeakService { speakerId: string, base64: string | Buffer, ): Promise { - return new Promise(async (resolve, reject) => { + try { const tiktokBody = { speaker_id: speakerId, appid: VolcenAudioSpeakService.TiktokAppId, @@ -97,41 +97,41 @@ export class VolcenAudioSpeakService { { audio_bytes: base64, audio_format: 'wav', - // 添加音频参数要求 - // sample_rate: 16000, // 采样率必须是16kHz - // channels: 1, // 单声道 - // bits: 16, // 16位深度 + sample_rate: 16000, + channels: 1, + bits: 16, }, ], source: 2, }; - // 开始克隆声音,参数: { speaker_id: 'S_FC60x0Gb1', appid: '8167092294', audio_format: 'wav' } console.log('开始克隆声音,参数:', { speaker_id: speakerId, appid: VolcenAudioSpeakService.TiktokAppId, audio_format: 'wav', }); - axios - .post(VolcenAudioSpeakService.SPEAK_CLONE_API, tiktokBody, { - headers: { - 'Resource-Id': 'volc.megatts.voiceclone', - Authorization: `Bearer;${VolcenAudioSpeakService.TiktokAccessToken}`, - 'Content-Type': 'application/json', - }, - }) - .then((result) => { - // 检查业务状态码 - if (result.data?.BaseResp?.StatusCode !== 0) { - console.error('克隆声音业务错误:', result.data); - throw new Error(`训练声音业务错误: ${result.data?.BaseResp?.StatusMessage || '未知错误'}`); - } - resolve(result.data as T); - }) - .catch((error) => { - reject(error); - }); - }); + + const result = await axios.post(VolcenAudioSpeakService.SPEAK_CLONE_API, tiktokBody, { + headers: { + 'Resource-Id': 'volc.megatts.voiceclone', + Authorization: `Bearer;${VolcenAudioSpeakService.TiktokAccessToken}`, + 'Content-Type': 'application/json', + }, + }); + + console.log('result.status', result.status); + + // 检查业务状态码 + if (result.data?.BaseResp?.StatusCode !== 0) { + console.error('克隆声音业务错误:', result.data); + throw new Error(`训练声音业务错误: ${result.data?.BaseResp?.StatusMessage || '未知错误'}`); + } + + return result.data as T; + } catch (error) { + const responseResult = error.response.data; + throw new Error(JSON.stringify(responseResult)); + } } /**