1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
| async updateUserResourceAction(ctx) { const lockKey = `timeSchedule:updateUserResourceAction:${ctx.natServer}`; const locked = await this.app.redis.setnx(lockKey, "1"); if (!locked) { return; } await this.app.redis.expire(lockKey, 60 * 60);
try { const updateStartTime = dayjs().valueOf(); this.app.logger.info('开始更新资源获取和消耗数据');
// 定义资源相关行为ID const resourceActionIds = [9910003, 9910004]; // 9910003: 资源获取, 9910004: 资源消耗
// 获取当前月份的索引 const currentMonth = dayjs().format('YYYY-MM'); const months = [currentMonth];
for (const month of months) { for (const actionId of resourceActionIds) { const indexName = `user_action_log_${actionId}_${month}`;
try { // 检查索引是否存在 const indexExists = await ctx.service.fs.es.client.indices.exists({ index: indexName });
if (!indexExists) { this.app.logger.info(`索引 ${indexName} 不存在,跳过处理`); continue; }
// 更新索引映射 await this.updateResourceActionMapping(ctx, indexName, actionId);
// 使用scroll API处理大量数据 const batchSize = 1000; let scrollId = null; let processedCount = 0; let updatedCount = 0;
// 初始查询 - 查找尚未拆分字段的记录 const searchQuery = { bool: { must_not: [] } };
// 根据不同的actionId添加不同的查询条件 if (actionId === 9910003) { searchQuery.bool.must_not.push({ exists: { field: "itemName" } }); } else if (actionId === 9910004) { searchQuery.bool.must_not.push({ exists: { field: "consumeType" } }); }
const searchResponse = await ctx.service.fs.es.client.search({ index: indexName, scroll: '1m', size: batchSize, body: { query: searchQuery, _source: ["_id", "ParamList"] } });
// 安全地获取scrollId和hits scrollId = searchResponse._scroll_id || searchResponse?._scroll_id; let hits = searchResponse.hits?.hits || [];
if (!scrollId) { this.app.logger.error(`无法获取索引 ${indexName} 的 _scroll_id,跳过处理`); continue; }
while (hits && hits.length > 0) { const maxRecordsPerRun = 500000; if (processedCount >= maxRecordsPerRun) { this.app.logger.info(`达到单次处理上限 ${maxRecordsPerRun},下次继续处理`); break; } const bulkOperations = [];
for (const hit of hits) { processedCount++;
if (hit._source.ParamList && Array.isArray(hit._source.ParamList)) { // 根据不同的ActionId处理不同的字段 if (actionId === 9910003) { // 资源获取行为 - 只处理第一个物品,方便后续统计 if (hit._source.ParamList.length >= 2) { const paramFirst = hit._source.ParamList[0];
const rewardType = hit._source.ParamList[1];
// 分割为单个物品和数量 const itemParts = paramFirst.split(';').filter(Boolean); const itemNameParts = itemParts[0]?.split(',') || [];
// 只取第一个物品 const itemName = itemNameParts[0] || ''; const itemValue = Number(itemNameParts[1] || 0);
bulkOperations.push({ update: { _index: indexName, _id: hit._id } });
bulkOperations.push({ doc: { itemName, itemValue, rewardType } }); } } else if (actionId === 9910004) { // 资源消耗行为 if (hit._source.ParamList.length >= 4) { const itemName = hit._source.ParamList[0] || ''; const consumeAmount = Number(hit._source.ParamList[1] || 0); const remainAmount = Number(hit._source.ParamList[2] || 0); const consumeType = hit._source.ParamList[3] || '';
bulkOperations.push({ update: { _index: indexName, _id: hit._id } });
bulkOperations.push({ doc: { itemName, consumeAmount, remainAmount, consumeType } }); } } } }
// 执行批量更新 if (bulkOperations.length > 0) { const bulkResponse = await ctx.service.fs.es.client.bulk({ body: bulkOperations });
if (bulkResponse.errors || bulkResponse?.errors) { this.app.logger.error(`批量更新出错: ${JSON.stringify( (bulkResponse.items || bulkResponse?.items || []) .filter(item => item.update.status >= 400) )}`); } else { updatedCount += bulkOperations.length / 2; } }
// 获取下一批数据 const scrollResponse = await ctx.service.fs.es.client.scroll({ scroll_id: scrollId, scroll: '1m' });
// 安全地获取scrollId和hits scrollId = scrollResponse._scroll_id || scrollResponse?._scroll_id; if (!scrollId) { this.app.logger.warn(`无法获取索引 ${indexName} 的下一批数据的 _scroll_id,停止处理`); break; }
hits = scrollResponse.hits?.hits || []; }
// 清理scroll if (scrollId) { await ctx.service.fs.es.client.clearScroll({ body: { scroll_id: scrollId } }); }
this.app.logger.info(`索引 ${indexName} 处理完成: 共处理 ${processedCount} 条数据,更新 ${updatedCount} 条数据`); } catch (error) { this.app.logger.error(`处理索引 ${indexName} 出错:`, error); continue; } } }
const updateEndTime = dayjs().valueOf(); this.app.logger.info( "资源获取和消耗数据更新完成", `耗时 ${(updateEndTime - updateStartTime) / 1000} s` ); } catch (error) { this.app.logger.error("资源获取和消耗数据更新失败:", error); } finally { // 释放锁 await this.app.redis.del(lockKey); } }
// 更新资源行为索引映射方法 async updateResourceActionMapping(ctx, indexName, actionId) { try { let mappingBody = {};
if (actionId === 9910003) { // 资源获取行为映射 - 增加GameUuid字段映射和gameUserUuid字段 mappingBody = { properties: { itemName: { type: "keyword" }, // 物品名称 itemValue: { type: "integer" }, // 物品数量 rewardType: { type: "keyword" }, // 奖励类型 gameUserUuid: { type: "keyword" } // 添加专用于聚合的字段 } }; } else if (actionId === 9910004) { // 资源消耗行为的映射 - 增加GameUuid字段映射和gameUserUuid字段 mappingBody = { properties: { itemName: { type: "keyword" }, // 物品名称 consumeAmount: { type: "integer" }, // 消耗数量 remainAmount: { type: "integer" }, // 剩余数量 consumeType: { type: "keyword" }, // 消耗类型 gameUserUuid: { type: "keyword" } // 添加专用于聚合的字段 } }; }
await ctx.service.fs.es.client.indices.putMapping({ index: indexName, body: mappingBody });
this.app.logger.info(`索引 ${indexName} 映射更新成功`);
// 添加脚本更新已有文档,将GameUuid复制到gameUserUuid字段 await ctx.service.fs.es.client.updateByQuery({ index: indexName, body: { script: { source: "ctx._source.gameUserUuid = ctx._source.GameUuid", lang: "painless" }, query: { bool: { must: [ { exists: { field: "GameUuid" } }, { bool: { must_not: [{ exists: { field: "gameUserUuid" } }] } } ] } } }, wait_for_completion: false // 异步执行,避免长时间阻塞 });
this.app.logger.info(`索引 ${indexName} 已启动 gameUserUuid 字段更新过程`); } catch (error) { this.app.logger.error(`更新索引 ${indexName} 映射失败:`, error); // 即使映射更新失败,我们仍然继续处理,因为可能已经有映射存在 } }
|