目前,我有一个用例如下:捕获集合中数据发生的所有事件识别已更新的字段创建特定于应用程序的审计事件......
目前,我有一个用例如下:
例如,如果某个项目有一个名为“状态”的字段,并且它从“写入”更改为“审核”,我应该在变更流中捕获它,然后创建一个名为“状态变更”的日志并将其保存在不同的集合中。
目前,这是我编写的代码:
using Microsoft.Extensions.Options;
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Driver;
using TodoApp.Configurations;
namespace TodoApp.Background
{
public class Service : IHostedService
{
private readonly IMongoCollection<BsonDocument> _mongoCollection;
private IChangeStreamCursor<BsonDocument> _changeStreamCursor;
private Task _watchTask;
private CancellationTokenSource _cancellationTokenSource;
public Service(IMongoClient mongoClient, IOptions<Database> options)
{
var database = mongoClient.GetDatabase(options.Value.DatabaseName);
_mongoCollection = database.GetCollection<BsonDocument>(options.Value.CollectionName);
}
public Task StartAsync(CancellationToken cancellationToken)
{
_cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_watchTask = Task.Run(() => WatchForChangesAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token);
return Task.CompletedTask;
}
private async Task WatchForChangesAsync(CancellationToken cancellationToken)
{
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
.Match(change =>
change.OperationType == ChangeStreamOperationType.Insert ||
change.OperationType == ChangeStreamOperationType.Update ||
change.OperationType == ChangeStreamOperationType.Replace
)
.AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>(
@"{
$project: {
'_id': 1,
'fullDocument': 1,
'ns': 1,
'documentKey': 1
}
}"
);
ChangeStreamOptions options = new()
{
FullDocument = ChangeStreamFullDocumentOption.UpdateLookup
};
_changeStreamCursor = _mongoCollection.Watch(
pipeline,
options
);
Console.WriteLine("Watching for changes...");
while (await _changeStreamCursor.MoveNextAsync(cancellationToken))
{
var batch = _changeStreamCursor.Current;
foreach (var change in batch)
{
// var serializedChange = change.ToJson();
// var document = BsonSerializer.Deserialize<Domain.ChangeStream<BsonDocument>>(serializedChange);
Console.WriteLine(change);
// Process the change
}
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
_cancellationTokenSource.Cancel();
return Task.WhenAny(_watchTask, Task.Delay(Timeout.Infinite, cancellationToken));
}
}
}
这些事件与 MongoDB Cloud 上的更新描述完美配合,但当我将其切换到 Azure Cosmos 时,它不支持它。还有其他替代方案吗?
使用您展示的格式的一个简单方法是收集所有键,然后 .get()
每个键的默认值为 0,并将其添加到新字典中。
以下是显示其实际运行的基本结构:
dictA = {'A': 1, 'B': 2, 'C': 3}
dictB = {'C': 1, 'D': 2, 'E': 3}
dictC = {'A': 2, 'C': 4, 'E': 6, 'G': 8}
dict_list = [dictA, dictB, dictC]
# Collect all the keys from each dict
all_keys = set()
for dictx in dict_list:
all_keys.update(dictx.keys())
# Iterate through each key adding it to the result
resultDict = {}
for key in all_keys:
# Pre-init with a 0 then grab it or default to 0 from each dict
resultDict[key] = 0
for dictx in dict_list:
resultDict[key] += dictx.get(key, 0)
print(resultDict)
以下是浓缩为一行的示例代码:
resultDict = {key: sum([x.get(key, 0) for x in dict_list]) for key in set().union(*(d.keys() for d in dict_list))}
主要受到 这个答案的 .
如果您有任何疑问,请告诉我。