实现方式

使用 ethers.js 监听事件

在区块链开发中,事件监听是一个至关重要的功能。虽然轮询也是一种可行的方案,但存在两个主要问题:

  1. 实时性不足,无法及时响应链上事件
  2. API 资源消耗较大,频繁的轮询会显著增加系统负载

注意1
区块链网络的稳定性存在一定挑战,事件监听的一个主要问题是难以保持长时间的持续连接。有时会出现无法获取最新监听事件的情况,比如七八个小时之后,这个问题在 GitHub 上也有其他开发者遇到。单纯发送 heartbeat 并不能完全解决这个问题,而且 ethers.js 的 WebSocket 实现由于封装的原因,缺少了完整的重连机制。

针对区块链监听不稳定后的重连,需要注意两个关键点:

  1. 妥善处理交易哈希,避免重复处理
  2. 每次重新启动监听时,主动查询之前的区块数据(例如往前1000个块),以防止漏掉重要事件。在我的实现中,使用 Redis 存储哈希值来防止重复消费。
  3. 所有消息的消费,都必须要进行校验,防止重复消费,比如实际操作前,先修改数据库标志位啊,数据库或者redis中都做提前处理,宁愿之后通过脚本重新消费,也不能贸然重试,区块链服务因为涉及第三方上链操作,需要很谨慎,毕竟去中心化。

注意2
如果业务场景确实需要使用轮询方式,可以通过 NestJS 的 cron 定时任务来实现 queryFilter。但根据实践经验,事件监听通常是更优的技术选择。

Provider 的选择与最佳实践

在实现事件监听时,Provider 的选择对系统的稳定性有重要影响。以下是两种主要的实现方式:

1. JsonRpcProvider 实现

1
const provider = new ethers.JsonRpcProvider(rpcAddress)

这种方式实现简单直接,但在需要长期保持连接的场景下可能会遇到稳定性问题。在轮询模式下,由于会定期重新创建 provider 实例,这个问题的影响相对较小。

2. WebSocket Provider 实现

心好累,没想到 ethers.js 这个周下载量百万的包,不支持 ws 的错误监听,链又是如此的脆弱,导致断开连接后,难以重启监听,实在是尴尬。

为了获得更好的连接稳定性和实时性,最佳实践 WebSocket Provider(ethers.js-V6版本),WebSocket 方式:

PS: 原版方法可以去 GitHub 中 ethers.js 的官方 issue 中查看。这个问题从 v5 版本就存在,GitHub Issue #1053 中详细描述了前因后果。让人意外的是,这个问题从 v5 到现在的 v6 依然没有得到改善。虽然这本身不是大问题,但作者至少应该在 WebSocket 相关文档中用醒目的方式标注出来,毕竟现在这个websocket的provider属于能用,但在长期监听等场景下可能会遇到问题,这让人很困惑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
async getWebSocketProvider() {
try {
if (PushService.wsProvider) {
return PushService.wsProvider;
}

const rpcAddress = this.configService.get('CONTRACT_ADDRESS_RPC');
if (!rpcAddress) {
throw new Error('CONTRACT_ADDRESS_RPC not configured');
}

// Convert http(s) to ws(s)
let wsAddress = rpcAddress;
if (rpcAddress.startsWith('http://')) {
wsAddress = rpcAddress.replace('http://', 'ws://');
} else if (rpcAddress.startsWith('https://')) {
wsAddress = rpcAddress.replace('https://', 'wss://');
}
let pingTimeout: NodeJS.Timeout | null = null;
let keepAliveInterval: NodeJS.Timeout | null = null;
let ws: WebSocket | null;
let EXPECTED_PONG_BACK = 15000;
let KEEP_ALIVE_CHECK_INTERVAL = 60 * 1000; //7500;
ws = new WebSocket(wsAddress);
await new Promise((resolve, reject) => {
ws.on("open", async () => {
// 增加ping事件
keepAliveInterval = setInterval(() => {
if (!ws) {
this.logger.warn('No websocket, exiting keep alive interval');
return;
}
ws.ping();
pingTimeout = setTimeout(() => {
if (ws) ws.close();
}, EXPECTED_PONG_BACK);
}, KEEP_ALIVE_CHECK_INTERVAL);

const wsp = new WebSocketProvider(() => ws!, xxxxx);
wsp._start();
while (!wsp.ready) {
await this.sleep(1000);
}
PushService.wsProvider = wsp
PushService.wsProvider.websocket.onerror = (error) => {
this.logger.warn('WebSocket connection error, attempting to reconnect...', error);
ws.close();
};

resolve(true);
})
ws.on("pong", () => {
if (pingTimeout) clearTimeout(pingTimeout);
});
ws.on("close", () => {
this.logger.error(new Error("The websocket connection was closed"));
if (keepAliveInterval) clearInterval(keepAliveInterval);
if (pingTimeout) clearTimeout(pingTimeout);
ws.terminate();
setTimeout(() => {
this._restartWebSocketProvider();
}, 5000);
reject(new Error("The websocket connection was closed"));
});

ws.on("error", (error) => {
this.logger.error("WebSocket error occurred:", error);
if (keepAliveInterval) clearInterval(keepAliveInterval);
if (pingTimeout) clearTimeout(pingTimeout);
setTimeout(() => {
this._restartWebSocketProvider();
}, 5000);
reject(error);
});
});
return PushService.wsProvider;
} catch (error) {
this.logger.error('Failed to create WebSocket provider:', error);
this.logger.error('Retrying in 5 seconds...');
throw error;
}
}

下面这两个已经不需要了,原先作为重连的解决方案的,但是发现其实没啥用,首先,定期请求并不能一直保持连接,依然会出现close的问题,定期重连的方案依然有瑕疵,close触发的时机并不是固定的,有时候隔个十分钟就会出现,所以都 不推荐

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private startConnectionCheck() {
if (this.connectionCheckInterval) {
clearInterval(this.connectionCheckInterval)
}
let blockNumber = 0
this.connectionCheckInterval = setInterval(async () => {
try {
if (!CrawlerNftService.wsProvider) return
blockNumber = await CrawlerNftService.wsProvider.getBlockNumber()
} catch (error) {
this.logger.error('last block number check failed', blockNumber)
this.logger.warn('Connection check failed, attempting to reconnect...')
clearInterval(this.connectionCheckInterval)
return
}
}, 30000)
}



// 每小时执行socket链接重置
@Cron('0 0 * * * *')
async restartWebSocketProvider() {
// 确保wsProvider存在再清理
if (CrawlerNftService.wsProvider) {
this.logger.log('[CrawlerNftService]重启wsProvider监听,当前存在的链接数:', await CrawlerNftService.wsProvider.listenerCount())
CrawlerNftService.wsProvider.removeAllListeners()
CrawlerNftService.wsProvider = null // 清理WebSocket实例
}
try {
await this.getWebSocketProvider() // 重新初始化WebSocket provider
await this.setupEventListeners() // 重新设置事件监听器
} catch (reconnectError) {
this.logger.error(`Error during WebSocket reconnection: ${reconnectError.message}`)
}
}

事件监听实现

下面是一个实际的事件监听实现示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private async setupBoxPreOpenListeners() {
const provider = await this.utilityService.getWebSocketProvider()
let boxManagerAddress = this.configService.get('xxxxx')
let boxManagerContract = new ethers.Contract(boxManagerAddress, [
{
"anonymous": false,
"inputs": [
{
"indexed": false,
"internalType": "address",
"name": "user",
"type": "address"
},
{
"indexed": false,
"internalType": "address",
"name": "user2",
"type": "address"
}
],
"name": "OpenEvent",
"type": "event"
},
], provider)

// 监听开箱事件,这里在所有inputs参数都获取完成后,可以获取event对象
boxManagerContract.on('OpenEvent', async (_account,_account2, event) => {
let account = String(_account)
let transactionHash = String(event.log.transactionHash)
// 处理事件逻辑
xxxxxx
})
}

开发经验总结

在区块链服务开发过程中,有以下几个关键点需要特别注意:

  1. 稳定性考虑

    • 区块链服务相比传统业务服务更容易受到外部因素影响
    • GAS 费用波动可能导致交易失败
    • 网络拥堵时可能影响交易执行
  2. 错误处理

    • 必须做好完善的 try-catch 异常处理
    • 实现合理的重试机制
    • 通常建议在命令之间添加适当的延时(如10s)
  3. 代码组织

    • 合约相关功能最好进行封装
    • 封装有利于实现统一的重试机制
    • 每次完整执行合约初始化,可以减少 nonce 重复问题

通过以上实践,可以构建更稳定可靠的区块链服务。