이런 문서가 있습니다
public class SomeDocument
{
public Guid Id { get; set; }
public string PropertyA { get; set; }
public string PropertyB { get; set; }
}
이제 PropertyA와 PropertyB를 적절하게 업데이트하고 비동기 방식으로 작동하는 두 가지 서비스 (A와 B)가 있습니다. 즉, 어떤 서비스가 먼저 완료되고 문서를 작성해야하며 누가 업데이트해야하는지 알 수 없습니다.
따라서 문서를 업데이트 (또는 생성)하기 위해 현재 서비스 A에서 이와 같은 코드를 사용합니다.
var filter = new FilterDefinitionBuilder<SomeDocument>().Where(r => r.Id == id);
var options = new FindOneAndUpdateOptions<SomeDocument, SomeDocument>() { IsUpsert = true };
var update = Builders<SomeDocument>.Update.Set(r => r.PropertyA, "Property A value");
await Database.GetCollection<SomeDocument>("someDocuments").FindOneAndUpdateAsync(filter, update, options);
서비스 B의 다음 코드
var filter = new FilterDefinitionBuilder<SomeDocument>().Where(r => r.Id == id);
var options = new FindOneAndUpdateOptions<SomeDocument, SomeDocument>() { IsUpsert = true };
var update = Builders<SomeDocument>.Update.Set(r => r.PropertyB, "Property B value");
await Database.GetCollection<SomeDocument>("someDocuments").FindOneAndUpdateAsync(filter, update, options);
모든 것이 괜찮아 보이지만 때로는 두 서비스가 동시에 작동 할 때 다음 오류가 발생합니다.
Unhandled Exception: MongoDB.Driver.MongoCommandException: Command findAndModify failed: E11000 duplicate key error collection: someDocuments index: _id_ dup key: { : BinData(3, B85ED193195A274DA94BC86B655B4509) }.
at MongoDB.Driver.Core.WireProtocol.CommandWireProtocol`1.ProcessReply(ConnectionId connectionId, ReplyMessage`1 reply)
at MongoDB.Driver.Core.WireProtocol.CommandWireProtocol`1.<ExecuteAsync>d__11.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at MongoDB.Driver.Core.Servers.Server.ServerChannel.<ExecuteProtocolAsync>d__26`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at MongoDB.Driver.Core.Operations.CommandOperationBase`1.<ExecuteProtocolAsync>d__29.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at MongoDB.Driver.Core.Operations.WriteCommandOperation`1.<ExecuteAsync>d__2.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at MongoDB.Driver.Core.Operations.FindAndModifyOperationBase`1.<ExecuteAsync>d__19.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at MongoDB.Driver.OperationExecutor.<ExecuteWriteOperationAsync>d__3`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at MongoDB.Driver.MongoCollectionImpl`1.<ExecuteWriteOperationAsync>d__62`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()
at CVSP.MongoDbStore.MongoDbWriteModelFacade.<AddRecordField>d__6.MoveNext() in D:\Projects\Test\Source\MongoDbStore\WriteModel\MongoDbWriteModelFacade.cs:line 58
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.AsyncMethodBuilderCore.<>c.<ThrowAsync>b__6_1(Object state)
at System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(Object state)
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()
at System.Threading.ThreadPoolWorkQueue.Dispatch()
at System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()
이 경우 문서를 어떻게 삽입 / 업데이트해야합니까?
최신 정보
확장이 트릭을 수행했습니다.
public static async Task<TProjection> FindOneAndUpdateWithConcurrencyAsync<TDocument, TProjection>(this IMongoCollection<TDocument> collection, FilterDefinition<TDocument> filter, UpdateDefinition<TDocument> update, FindOneAndUpdateOptions<TDocument, TProjection> options = null, CancellationToken cancellationToken = default(CancellationToken))
{
try
{
return await collection.FindOneAndUpdateAsync(filter, update, options, cancellationToken);
}
catch (MongoException ex)
{
Thread.Sleep(10);
return await collection.FindOneAndUpdateAsync(filter, update, options, cancellationToken);
}
}
처음 귀두에서 try / catch를 사용하는 것은 이상해 보이며 처음부터 마음에 들지 않았지만 https://docs.mongodb.com/manual/reference/method/db.collection.findAndModify/#upsert-and-unique - 모든 의심이 사라졌습니다.
글쎄, 이것은 동기화 문제이며 불행히도 간단한 해결책이 없습니다. 해킹을 찾기 위해 백엔드에서 일어날 수있는 일을 분석해 보겠습니다.
문서를 upsert하려는 두 개의 스레드 (서비스)가 있다고 가정 해 보겠습니다.
t1: 00:00:00.250 -> find document with Id (1)
t2: 00:00:00.255 -> find document with id (1)
t1: 00:00:00.260 -> No document found
t2: 00:00:00.262 -> No document found
t1: 00:00:00.300 -> Insert a document with Id(1)
t2: 00:00:00.300 -> Insert a document with Id(1)
빙고 ... 예외가 있습니다. 두 스레드가 동일한 ID를 가진 문서를 삽입하려고합니다.
우리가 여기서 무엇을 할 수 있습니까?
이 단점을 우리의 이점으로 바꾸자. 이 예외를 캡처하고 upsert를 다시 호출하십시오. 이번에는 성공적으로 문서를 찾고 업데이트합니다.
다음과 같이 ServiceA
nd에 대한 코드를 수정 ServiceB
하고 10000 개의 문서를 타이트 루프에 삽입하려고했습니다.
public async Task ServiceA(Guid id)
{
var filter = new FilterDefinitionBuilder<SomeDocument>().Where(r => r.Id == id);
var update = Builders<SomeDocument>.Update.Set(r => r.PropertyA, "Property A value");
var options = new UpdateOptions() { IsUpsert = true };
var database = _client.GetDatabase("stackoverflow");
var collection = database.GetCollection<SomeDocument>(CollectionName,
new MongoCollectionSettings
{
WriteConcern = WriteConcern.W1
});
await collection.UpdateOneAsync(filter, update, options);
}
public async Task ServiceB(Guid id)
{
var filter = new FilterDefinitionBuilder<SomeDocument>().Where(r => r.Id == id);
var update = Builders<SomeDocument>.Update.Set(r => r.PropertyB, "Property B value");
var options = new UpdateOptions() { IsUpsert = true };
var database = _client.GetDatabase("stackoverflow");
var collection = database.GetCollection<SomeDocument>(CollectionName,
new MongoCollectionSettings
{
WriteConcern = WriteConcern.W1
});
await collection.UpdateOneAsync(filter, update, options);
}
다음은 내 lanuching 코드입니다. 완벽하지는 않지만 목적에 부합합니다.
for (var i = 0; i < 10000; i++)
{
var _guid = Guid.NewGuid();
var _tasks = new[]
{
new Task(async (x) =>
{
var p = new Program();
try
{
await p.ServiceA(Guid.Parse(x.ToString()));
}
catch (MongoWriteException me)
{
await Task.Delay(5);
await p.ServiceA(Guid.Parse(x.ToString()));
}
}, _guid),
new Task(async (x) =>
{
var p = new Program();
try
{
await p.ServiceB(Guid.Parse(x.ToString()));
}
catch (MongoWriteException me)
{
await Task.Delay(5);
await p.ServiceB(Guid.Parse(x.ToString()));
}
}, _guid)
};
_tasks[0].Start();
_tasks[1].Start();
Task.WaitAll(_tasks);
}
이 기사는 인터넷에서 수집됩니다. 재 인쇄 할 때 출처를 알려주십시오.
침해가 발생한 경우 연락 주시기 바랍니다[email protected] 삭제
몇 마디 만하겠습니다