目前,我有一个用例如下:捕获集合中数据发生的所有事件识别已更新的字段创建特定于应用程序的审计事件......
目前,我有一个用例如下:
例如,如果某个项目有一个名为“状态”的字段,并且它从“写入”更改为“审核”,我应该在变更流中捕获它,然后创建一个名为“状态变更”的日志并将其保存在不同的集合中。
目前,这是我编写的代码:
using Microsoft.Extensions.Options;
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Driver;
using TodoApp.Configurations;
namespace TodoApp.Background
{
public class Service : IHostedService
{
private readonly IMongoCollection<BsonDocument> _mongoCollection;
private IChangeStreamCursor<BsonDocument> _changeStreamCursor;
private Task _watchTask;
private CancellationTokenSource _cancellationTokenSource;
public Service(IMongoClient mongoClient, IOptions<Database> options)
{
var database = mongoClient.GetDatabase(options.Value.DatabaseName);
_mongoCollection = database.GetCollection<BsonDocument>(options.Value.CollectionName);
}
public Task StartAsync(CancellationToken cancellationToken)
{
_cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_watchTask = Task.Run(() => WatchForChangesAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token);
return Task.CompletedTask;
}
private async Task WatchForChangesAsync(CancellationToken cancellationToken)
{
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
.Match(change =>
change.OperationType == ChangeStreamOperationType.Insert ||
change.OperationType == ChangeStreamOperationType.Update ||
change.OperationType == ChangeStreamOperationType.Replace
)
.AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>(
@"{
$project: {
'_id': 1,
'fullDocument': 1,
'ns': 1,
'documentKey': 1
}
}"
);
ChangeStreamOptions options = new()
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup
};
_changeStreamCursor = _mongoCollection.Watch(
pipeline,
options
);
Console.WriteLine("Watching for changes...");
while (await _changeStreamCursor.MoveNextAsync(cancellationToken))
{
var batch = _changeStreamCursor.Current;
foreach (var change in batch)
{
// var serializedChange = change.ToJson();
// var document = BsonSerializer.Deserialize<Domain.ChangeStream<BsonDocument>>(serializedChange);
Console.WriteLine(change);
// Process the change
}
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
_cancellationTokenSource.Cancel();
return Task.WhenAny(_watchTask, Task.Delay(Timeout.Infinite, cancellationToken));
}
}
}
这些事件与 MongoDB Cloud 上的更新描述完美配合,但当我将其切换到 Azure Cosmos 时,它不支持它。还有其他替代方案吗?