实现方式
使用 ethers.js 监听事件
在区块链开发中,事件监听是一个至关重要的功能。虽然轮询也是一种可行的方案,但存在两个主要问题:
- 实时性不足,无法及时响应链上事件
- API 资源消耗较大,频繁的轮询会显著增加系统负载
注意1
区块链网络的稳定性存在一定挑战,事件监听的一个主要问题是难以保持长时间的持续连接。有时会出现无法获取最新监听事件的情况,比如七八个小时之后,这个问题在 GitHub 上也有其他开发者遇到。单纯发送 heartbeat 并不能完全解决这个问题,而且 ethers.js 的 WebSocket 实现由于封装的原因,缺少了完整的重连机制。
针对区块链监听不稳定后的重连,需要注意两个关键点:
- 妥善处理交易哈希,避免重复处理
- 每次重新启动监听时,主动查询之前的区块数据(例如往前1000个块),以防止漏掉重要事件。在我的实现中,使用 Redis 存储哈希值来防止重复消费。
- 所有消息的消费,都必须要进行校验,防止重复消费,比如实际操作前,先修改数据库标志位啊,数据库或者redis中都做提前处理,宁愿之后通过脚本重新消费,也不能贸然重试,区块链服务因为涉及第三方上链操作,需要很谨慎,毕竟去中心化。
注意2
如果业务场景确实需要使用轮询方式,可以通过 NestJS 的 cron 定时任务来实现 queryFilter。但根据实践经验,事件监听通常是更优的技术选择。
Provider 的选择与最佳实践
在实现事件监听时,Provider 的选择对系统的稳定性有重要影响。以下是两种主要的实现方式:
1. JsonRpcProvider 实现
1
| const provider = new ethers.JsonRpcProvider(rpcAddress)
|
这种方式实现简单直接,但在需要长期保持连接的场景下可能会遇到稳定性问题。在轮询模式下,由于会定期重新创建 provider 实例,这个问题的影响相对较小。
2. WebSocket Provider 实现
心好累,没想到 ethers.js 这个周下载量百万的包,不支持 ws 的错误监听,链又是如此的脆弱,导致断开连接后,难以重启监听,实在是尴尬。
为了获得更好的连接稳定性和实时性,最佳实践 WebSocket Provider(ethers.js-V6版本),WebSocket 方式:
PS: 原版方法可以去 GitHub 中 ethers.js 的官方 issue 中查看。这个问题从 v5 版本就存在,GitHub Issue #1053 中详细描述了前因后果。让人意外的是,这个问题从 v5 到现在的 v6 依然没有得到改善。虽然这本身不是大问题,但作者至少应该在 WebSocket 相关文档中用醒目的方式标注出来,毕竟现在这个websocket的provider属于能用,但在长期监听等场景下可能会遇到问题,这让人很困惑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| async getWebSocketProvider() { try { if (PushService.wsProvider) { return PushService.wsProvider; }
const rpcAddress = this.configService.get('CONTRACT_ADDRESS_RPC'); if (!rpcAddress) { throw new Error('CONTRACT_ADDRESS_RPC not configured'); }
let wsAddress = rpcAddress; if (rpcAddress.startsWith('http://')) { wsAddress = rpcAddress.replace('http://', 'ws://'); } else if (rpcAddress.startsWith('https://')) { wsAddress = rpcAddress.replace('https://', 'wss://'); } let pingTimeout: NodeJS.Timeout | null = null; let keepAliveInterval: NodeJS.Timeout | null = null; let ws: WebSocket | null; let EXPECTED_PONG_BACK = 15000; let KEEP_ALIVE_CHECK_INTERVAL = 60 * 1000; ws = new WebSocket(wsAddress); await new Promise((resolve, reject) => { ws.on("open", async () => { keepAliveInterval = setInterval(() => { if (!ws) { this.logger.warn('No websocket, exiting keep alive interval'); return; } ws.ping(); pingTimeout = setTimeout(() => { if (ws) ws.close(); }, EXPECTED_PONG_BACK); }, KEEP_ALIVE_CHECK_INTERVAL);
const wsp = new WebSocketProvider(() => ws!, xxxxx); wsp._start(); while (!wsp.ready) { await this.sleep(1000); } PushService.wsProvider = wsp PushService.wsProvider.websocket.onerror = (error) => { this.logger.warn('WebSocket connection error, attempting to reconnect...', error); ws.close(); };
resolve(true); }) ws.on("pong", () => { if (pingTimeout) clearTimeout(pingTimeout); }); ws.on("close", () => { this.logger.error(new Error("The websocket connection was closed")); if (keepAliveInterval) clearInterval(keepAliveInterval); if (pingTimeout) clearTimeout(pingTimeout); ws.terminate(); setTimeout(() => { this._restartWebSocketProvider(); }, 5000); reject(new Error("The websocket connection was closed")); });
ws.on("error", (error) => { this.logger.error("WebSocket error occurred:", error); if (keepAliveInterval) clearInterval(keepAliveInterval); if (pingTimeout) clearTimeout(pingTimeout); setTimeout(() => { this._restartWebSocketProvider(); }, 5000); reject(error); }); }); return PushService.wsProvider; } catch (error) { this.logger.error('Failed to create WebSocket provider:', error); this.logger.error('Retrying in 5 seconds...'); throw error; } }
|
下面这两个已经不需要了,原先作为重连的解决方案的,但是发现其实没啥用,首先,定期请求并不能一直保持连接,依然会出现close的问题,定期重连的方案依然有瑕疵,close触发的时机并不是固定的,有时候隔个十分钟就会出现,所以都 不推荐
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| private startConnectionCheck() { if (this.connectionCheckInterval) { clearInterval(this.connectionCheckInterval) } let blockNumber = 0 this.connectionCheckInterval = setInterval(async () => { try { if (!CrawlerNftService.wsProvider) return blockNumber = await CrawlerNftService.wsProvider.getBlockNumber() } catch (error) { this.logger.error('last block number check failed', blockNumber) this.logger.warn('Connection check failed, attempting to reconnect...') clearInterval(this.connectionCheckInterval) return } }, 30000) }
@Cron('0 0 * * * *') async restartWebSocketProvider() { if (CrawlerNftService.wsProvider) { this.logger.log('[CrawlerNftService]重启wsProvider监听,当前存在的链接数:', await CrawlerNftService.wsProvider.listenerCount()) CrawlerNftService.wsProvider.removeAllListeners() CrawlerNftService.wsProvider = null } try { await this.getWebSocketProvider() await this.setupEventListeners() } catch (reconnectError) { this.logger.error(`Error during WebSocket reconnection: ${reconnectError.message}`) } }
|
事件监听实现
下面是一个实际的事件监听实现示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| private async setupBoxPreOpenListeners() { const provider = await this.utilityService.getWebSocketProvider() let boxManagerAddress = this.configService.get('xxxxx') let boxManagerContract = new ethers.Contract(boxManagerAddress, [ { "anonymous": false, "inputs": [ { "indexed": false, "internalType": "address", "name": "user", "type": "address" }, { "indexed": false, "internalType": "address", "name": "user2", "type": "address" } ], "name": "OpenEvent", "type": "event" }, ], provider) boxManagerContract.on('OpenEvent', async (_account,_account2, event) => { let account = String(_account) let transactionHash = String(event.log.transactionHash) xxxxxx }) }
|
开发经验总结
在区块链服务开发过程中,有以下几个关键点需要特别注意:
稳定性考虑
- 区块链服务相比传统业务服务更容易受到外部因素影响
- GAS 费用波动可能导致交易失败
- 网络拥堵时可能影响交易执行
错误处理
- 必须做好完善的 try-catch 异常处理
- 实现合理的重试机制
- 通常建议在命令之间添加适当的延时(如10s)
代码组织
- 合约相关功能最好进行封装
- 封装有利于实现统一的重试机制
- 每次完整执行合约初始化,可以减少 nonce 重复问题
通过以上实践,可以构建更稳定可靠的区块链服务。