我有Event
包含Embedded的文档Snapshots
。
我想在if中添加Snapshot
A Event
:
否则...创建一个新的Event
。
这是我的findAndUpdate
查询可能更有意义:
Event.findAndModify(
query: {
start_timestamp: { $gte: newSnapshot.timestamp - 5min },
last_snapshot_timestamp: { $gte: newSnapshot.timestamp - 1min }
},
update: {
snapshots[newSnapshot.timestamp]: newSnapshot,
$max: { last_snapshot_timestamp: newSnapshot.timestamp },
$min: { start_timestamp: newSnapshot.timestamp }
},
upsert: true,
$setOnInsert: { ALL OUR NEW EVENT FIELDS } }
)
编辑:不幸的是,我无法在start_timestamp上创建唯一索引。快照带有不同的时间戳,我想将它们分组为一个事件。即快照A在12:00:00进入,快照B在12:00:59进入。它们应该处于同一事件中,但是可以在不同时间将它们写入数据库,因为编写它们的工作人员正在同时执行操作。假设在12:00:30出现了另一个快照,该快照应与上述两个快照写入同一事件。最后,应将12:02:00的快照写入新事件。
我的问题是……在并发环境中能否正常工作。是findAndUpdate
原子的吗?当我应该创建两个事件并将快照添加到其中时,是否可以创建两个事件?
编辑:因此,如@chainh所指出的,上述方法不能保证不会创建两个事件。
因此,我尝试了一种基于锁定的新方法-您认为这可行吗?
var acquireLock = function() {
var query = { "locked": false}
var update = { $set: { "locked": true } }
return Lock.findAndModify({
query: query,
update: update,
upsert: true
})
};
var releaseLock = function() {
var query = { "locked": true }
var update = { $set: { "locked": false } }
return Lock.findAndModify({
query: query,
update: update
})
};
var insertSnapshot = function(newSnapshot, upsert) {
Event.findAndModify(
query: {
start_timestamp: { $gte: newSnapshot.timestamp - 5min },
last_snapshot_timestamp: { $gte: newSnapshot.timestamp - 1min }
},
update: {
snapshots[newSnapshot.timestamp]: newSnapshot,
$max: { last_snapshot_timestamp: newSnapshot.timestamp },
$min: { start_timestamp: newSnapshot.timestamp }
},
upsert: upsert,
$setOnInsert: { ALL OUR NEW EVENT FIELDS } }
)
};
var safelyInsertEvent = function(snapshot) {
return insertSnapshot(snapshot, false)
.then(function(modifyRes) {
if (!modifyRes.succeeded) {
return acquireLock()
}
})
.then(function(lockRes) {
if (lockRes.succeeded) {
return insertSnapshot(snapshot, true)
} else {
throw new AcquiringLockError("Didn't acquire lock. Try again")
}
})
.then(function() {
return releaseLock()
})
.catch(AcquiringLockError, function(err) {
return safelyInsertEvent(snapshot)
})
};
锁定文档将仅包含一个字段(锁定)。基本上,以上代码尝试查找现有事件并对其进行更新。如果有效,那很好,我们可以纾困。如果我们不进行更新,则说明我们没有现有的事件可将快照保留在其中。因此,我们可以自动获取一个锁,如果成功,则可以安全地重新插入新事件。如果获取该锁失败,我们只需再次尝试整个过程,并希望到那时我们有一个现有事件可以坚持下去。
根据您的代码:
Event.findAndModify(
query: {
start_timestamp: { $gte: newSnapshot.timestamp - 5min },
last_snapshot_timestamp: { $gte: newSnapshot.timestamp - 1min }
},
update: {
snapshots[newSnapshot.timestamp]: newSnapshot,
$max: { last_snapshot_timestamp: newSnapshot.timestamp },
$min: { start_timestamp: newSnapshot.timestamp }
},
upsert: true,
$setOnInsert: { ALL OUR NEW EVENT FIELDS } }
)
成功将第一个事件文档插入数据库后,该事件文档的字段具有以下关系:
start_timestamp == last_snapshot_timestamp
在后续更新之后,该关系变为:
start_timestamp <last_snapshot_timestamp <last_snapshot_timestamp + 1min <start_timestamp + 5min
或
start_timestamp <last_snapshot_timestamp <start_timestamp + 5min <last_snapshot_timestamp + 1min
因此,如果新快照要连续插入此事件文档,则必须符合以下条件:
newSnapshot.timestamp <Math.min(last_snapshot_timestamp + 1,start_timestamp + 5)
假设数据库随时间推移有两个Event文档:
Event1(start_timestamp1,last_snapshot_timestamp1),
Event2(start_timestamp2,last_snapshot_timestamp2)
通常,start_timestamp2> last_snapshot_timestamp1
现在,如果有一个新的快照,并且其时间戳小于start_timestamp1(假设它是通过延迟或伪造实现的),则可以将该快照插入到两个Event文档中。因此,我怀疑您是否需要在查询部分中添加其他条件,以确保last_snapshot_timestamp与start_timestamp之间的距离始终小于某个值(例如5min)?例如,我将查询更改为
query: {
start_timestamp: { $gte: newSnapshot.timestamp - 5min },
last_snapshot_timestamp: { $gte: newSnapshot.timestamp - 1min , $lte : newSnapshot.timestamp + 5}
}
好的,让我们继续...
如果我尝试解决此问题,我仍将尝试在字段start_timestamp上构建唯一索引。根据MongoDB的手册,使用findAndModify或update可以自动完成工作。但是令人头疼的是,当出现重复值时我应该如何处理,因为newSnapshot.timestamp不受控制,它可能会由运算符$ min修改start_timestamp。
这些方法是:
由于不需要返回事件文档,因此我使用update而不是findAndModify,因为两者都是原子操作,在这种情况下update的编写更为简单。
我使用简单的JavaScript(在mongo shell上运行)来表达步骤(我不熟悉您使用的代码语法:D),并且我认为您可以轻松理解。
var gap5 = 5 * 60 * 1000; // just suppose, you should change accordingly if the value is not true.
var gap1 = 1 * 60 * 1000;
var initialFields = {}; // ALL OUR NEW EVENT FIELDS
function insertSnapshotIfStartTimeStampNotExisted() {
var query = {
start_timestamp: { $gte: newSnapshot.timestamp - gap5 },
last_snapshot_timestamp: { $gte: newSnapshot.timestamp - gap1 }
};
var update = {
$push : {snapshots: newSnapshot}, // suppose snapshots is an array
$max: { last_snapshot_timestamp: newSnapshot.timestamp },
$min: { start_timestamp: newSnapshot.timestamp },
$setOnInsert : initialFields
},
var result = db.Event.update(query, update, {upsert : true});
if (result.nUpserted == 0 && result.nModified == 0) {
insertSnapshotIfStartTimeStampExisted(); // Event document existed with that start_timestamp
}
}
function insertSnapshotIfStartTimeStampExisted() {
var query = {
start_timestamp: newSnapshot.timestamp,
};
var update = {
$push : {snapshots: newSnapshot}
},
var result = db.Event.update(query, update, {upsert : false});
if (result.nModified == 0) {
insertSnapshotIfStartTimeStampNotExisted(); // If start_timestamp just gets modified; it's possible.
}
}
// entry
db.Event.ensureIndex({start_timestamp:1},{unique:true});
insertSnapshotIfStartTimeStampNotExisted();
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句