我在我的 cron 中附加了以下函数,每 10 分钟执行一次,问题是
以下函数未在我的舞台环境中运行(在我的本地环境中运行得很好)。我相信问题出在函数 _fetchIdToProcess 中,因为整个代码取决于数组的长度。还使用 bluebird 来“促进”redis 调用
const CacheUtils = require('../../utils/cache-utils');
const ESUtils = require('../../utils/es-utils');
const esClient = ESUtils.initializeESConnection();
/**
* @description the following cron fetches all the docs from events needed to be merged on the basis of
* fileGroupId , after merging all the "dangling docs" will be deleted
*/
module.exports = class MergeFileEvents {
static async mergeFileEvent() {
const client = await CacheUtils.getClient();
const fileGroupIds = await this._fetchIdToProcess('file_upload*');
if (fileGroupIds.length > 0) {
// locking the fileGroupIds , for preventing dirty reads
await client.saddAsync('processing_file_ids',fileGroupIds);
// main implementation for merging docs
await this._mergeDocs(fileGroupIds);
// releasing the locks (deleting the ids)
await this._releaseLockFileGroupIds(fileGroupIds)
}
}
static async _fetchIdToProcess(pattern) {
const client = await CacheUtils.getClient();
let keys = [];
let cursor = '0';
do {
const reply = await client.scanAsync(cursor, 'MATCH', pattern, 'COUNT', 1000);
cursor = reply[0];
keys = keys.concat(reply[1]);
} while (cursor !== '0');
const processingIds = await client.smembersAsync('processing_file_ids') || [];
return keys.filter(key => !processingIds.includes(key));
}
static async _mergeDocs(fileGroupIds) {
const date = new Date();
const client =await CacheUtils.getClient();
for (let id of fileGroupIds) {
const eventIds = await client.smembersAsync(id);
if (eventIds.length <= 1) continue; // no need to merge in this case
const body = await esClient.search({
index: `events_${date.toJSON().split('T')[0]}`,
body: {
query: { ids: { values: eventIds } },
_source: ['file.id'],
size: eventIds.length
}
});
const fileIdsPayload = await Promise.all(eventIds.map(async (id) => {
const createdAt = await client.getAsync(id);
return{
id: body.hits.hits.find(hit => hit._id == id)?._source.file.id,
createdAt
}
}));
const lastUpdate = fileIdsPayload.reduce((max, curr) => {
return new Date(curr.createdAt) > new Date(max.createdAt) ? curr : max;
}).createdAt;
await esClient.update({
index: `events_${date.toJSON().split('T')[0]}`,
id: eventIds[0],
body: {
doc: {
files: fileIdsPayload,
updatedAt: lastUpdate
}
}
});
const deleteEvents = eventIds.slice(1).map(id => ({
delete: {
_index: `events_${date.toJSON().split('T')[0]}`,
_id: id
}
}));
await esClient.bulk({
body: deleteEvents
});
}
}
static async _releaseLockFileGroupIds(fileGroupIds) {
const client =await CacheUtils.getClient();
// deleting fileGroupIds from processing set
await client.sremAsync('processing_file_ids',fileGroupIds);
// deleting timestamps of eventId
for (let id of fileGroupIds) {
await client.delAsync(id);
}
}
}
听起来您已经完成了相当多的故障排除。根据代码和您描述的问题,您可能需要检查或尝试以下一些内容:
Redis 和 Elasticsearch 配置: 确保本地环境和临时环境中的 Redis 和 Elasticsearch 配置保持一致。配置或版本的差异有时会导致意外的行为。例如,如果 Redis 有不同的超时或 Elasticsearch 有不同的映射,它可能会影响您的 cron 作业的行为。
日志记录和错误处理: 添加详细的日志记录和强大的错误处理可以帮助您查明故障发生的位置。例如,在 _fetchIdToProcess 函数中,您可以记录 Redis 命令的响应,以查看它们是否返回预期的数据:
const CacheUtils = require('../../utils/cache-utils');
const ESUtils = require('../../utils/es-utils');
const esClient = ESUtils.initializeESConnection();
module.exports = class MergeFileEvents {
static async mergeFileEvent() {
try {
const client = await CacheUtils.getClient();
const fileGroupIds = await this._fetchIdToProcess('file_upload*');
console.log('File group IDs to process:', fileGroupIds);
if (fileGroupIds.length > 0) {
await client.saddAsync('processing_file_ids', fileGroupIds);
await this._mergeDocs(fileGroupIds);
await this._releaseLockFileGroupIds(fileGroupIds);
}
} catch (error) {
console.error('Error in mergeFileEvent:', error);
}
}
static async _fetchIdToProcess(pattern) {
try {
const client = await CacheUtils.getClient();
let keys = [];
let cursor = '0';
do {
const reply = await client.scanAsync(cursor, 'MATCH', pattern, 'COUNT', 1000);
cursor = reply[0];
keys = keys.concat(reply[1]);
console.log('SCAN keys:', reply[1]);
} while (cursor !== '0');
const processingIds = await client.smembersAsync('processing_file_ids') || [];
return keys.filter(key => !processingIds.includes(key));
} catch (error) {
console.error('Error in _fetchIdToProcess:', error);
return [];
}
}
static async _mergeDocs(fileGroupIds) {
try {
const date = new Date();
const client = await CacheUtils.getClient();
for (let id of fileGroupIds) {
const eventIds = await client.smembersAsync(id);
if (eventIds.length <= 1) continue;
const body = await esClient.search({
index: `events_${date.toJSON().split('T')[0]}`,
body: {
query: { ids: { values: eventIds } },
_source: ['file.id'],
size: eventIds.length
}
});
const fileIdsPayload = await Promise.all(eventIds.map(async (id) => {
const createdAt = await client.getAsync(id);
return {
id: body.hits.hits.find(hit => hit._id == id)?._source.file.id,
createdAt
}
}));
const lastUpdate = fileIdsPayload.reduce((max, curr) => {
return new Date(curr.createdAt) > new Date(max.createdAt) ? curr : max;
}).createdAt;
await esClient.update({
index: `events_${date.toJSON().split('T')[0]}`,
id: eventIds[0],
body: {
doc: {
files: fileIdsPayload,
updatedAt: lastUpdate
}
}
});
const deleteEvents = eventIds.slice(1).map(id => ({
delete: {
_index: `events_${date.toJSON().split('T')[0]}`,
_id: id
}
}));
await esClient.bulk({ body: deleteEvents });
}
} catch (error) {
console.error('Error in _mergeDocs:', error);
}
}
static async _releaseLockFileGroupIds(fileGroupIds) {
try {
const client = await CacheUtils.getClient();
await client.sremAsync('processing_file_ids', fileGroupIds);
for (let id of fileGroupIds) {
await client.delAsync(id);
}
} catch (error) {
console.error('Error in _releaseLockFileGroupIds:', error);
}
}
}
检查 Redis 异步方法: 确保用于 Redis 操作的异步方法已正确承诺并按预期返回承诺。如果您使用的是 Bluebird 这样的库,请验证它是否与您的 Redis 客户端正确集成。
索引命名和查询性能: 验证您的 Elasticsearch 索引命名约定是否与暂存中的命名约定相匹配。另外,检查您的 Elasticsearch 查询是否存在性能问题。例如,确保索引存在并且正确映射:
并发问题: 如果 cron 作业并发运行,请确保使用 Redis 集 (processing_file_ids) 的锁定机制正常运行。锁管理不善可能会导致数据一致性问题。
环境特定问题: 仔细检查您的暂存环境,看看是否存在可能影响 cron 作业的依赖项、环境变量或权限方面的任何差异。
通过解决这些领域,您应该能够缩小问题的根本原因。如果问题仍然存在,请考虑在每个 Redis 和 Elasticsearch 操作中添加更精细的日志记录,以便更好地了解可能出现问题的位置。
希望这有帮助!