简介
BullMQ 是一个强大的基于 Redis 的队列管理库,它可以帮助我们在 NestJS 应用中处理后台任务、消息队列和作业调度。在实际项目开发中,消息队列是一个几乎不可或缺的组件,主要基于以下几个原因:
延时队列需求:在业务中经常需要处理延时执行的任务。
事件触发处理:比如玩家登录、升级等行为触发的后续操作,这些都需要可靠的消息处理机制。
消息可靠性:直接在消息监听器中处理业务逻辑风险较高,特别是对于一些关键操作(如支付)。消息队列提供了消息持久化能力,确保即使处理失败也可以重试。
系统解耦:通过消息队列,可以实现业务逻辑的解耦,提高系统的可维护性和扩展性。
性能优化:提供消息缓存机制,避免系统过载,实现更好的性能表现。
为什么选择 BullMQ?
BullMQ 具有以下优势:
- 高性能:基于 Redis 实现,具有出色的性能表现
- 可靠性:支持任务重试和错误处理机制
- 功能丰富:提供作业进度追踪能力
- 并发处理:支持多进程并发处理任务
- 灵活调度:支持延迟任务和重复任务处理
- 事件系统:完善的事件监听和处理机制
- 框架集成:与 NestJS 框架完美集成
配置 BullMQ
在 NestJS 应用中配置 BullMQ 主要包含两个步骤:
Redis 配置:设置 Redis 连接参数,包括:
队列注册:注册所需的消息队列。
注意:项目中通常会创建多个队列,这样做有两个主要好处:
- 实现不同类型消息的隔离,使逻辑更清晰
- 支持并发消费消息,提高处理效率
但需要注意的是,使用多个队列可能会影响消息的处理顺序。
具体配置示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| import { Module } from '@nestjs/common';
import { BullModule } from "@nestjs/bullmq"
import { ConfigModule, ConfigService } from '@nestjs/config'
@Module({ imports: [ BullModule.forRootAsync({ inject: [ConfigService], imports: [ConfigModule], useFactory: (configService: ConfigService) => ({ connection: { host: configService.get<string>('REDIS_HOST'), port: configService.get<number>('REDIS_PORT'), password: configService.get<string>('REDIS_PASSWORD'), db: configService.get<number>('REDIS_DB'), }, }), }), BullModule.registerQueue( { name: BULL_MQ_QUEUE, defaultJobOptions: { removeOnComplete: true, }, }, { name: BULL_MQ_PRE_OPEN_BOX, defaultJobOptions: { removeOnComplete: true, }, }, ), ], }) export class AppModule {}
|
创建生产者(Producer)
生产者的主要职责是接收消息并将其发送到相应的 BullMQ 队列中。以下是一个典型的生产者实现示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| import { Injectable } from '@nestjs/common'; import { InjectQueue } from '@nestjs/bull'; import { Queue } from 'bull';
@Injectable() export class TaskProducer { constructor( @InjectQueue(BULL_MQ_PRE_OPEN_BOX) private BullPreOpenBoxQueue: Queue, @InjectQueue(BULL_MQ_SHIP) private BullShipQueue: Queue, ) {}
async addTask(data: any) { await this.BullPreOpenBoxQueue.add('processTask', data); } }
|
创建消费者(Consumer)
消费者负责处理队列中的任务,通过继承 WorkerHost 类,我们可以获得更完善的任务管理机制。下面是一个实际的消费者示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| @Processor(BULL_MQ_SHIP, { concurrency: 1 }) export class ShipProcessor extends WorkerHost { private readonly logger = new Logger(ShipProcessor.name); constructor( @InjectModel('History') private historyModel: Model<HistoryDocument>, private configService: ConfigService, private utilityService: UtilityService, @InjectRedis() private readonly redis: Redis, ) { super() }
async process(job: Job<any, any, string>): Promise<any> { try { if (this.configService.get('IsDev') == 'true') { return }
if (job.name == 'shipClaimJob') { let { account } = job.data this.logger.log(`开始处理 ${job.name} ${transactionHash} 任务`) xxxxxx this.logger.log(`结束处理 ${job.name} ${transactionHash} 任务`) }
return {} } catch (error) { this.logger.error('[job]任务处理失败', error) throw error } } }
|