8wDlpd.png
8wDFp9.png
8wDEOx.png
8wDMfH.png
8wDKte.png

如何捕获在 Azure Cosmos 上运行的 MongoDB 上的变化流?

Hamish 3月前

137 0

目前,我有一个用例如下:捕获集合中数据发生的所有事件识别已更新的字段创建特定于应用程序的审计事件......

目前,我有一个用例如下:

  1. 捕获集合内数据发生的所有事件
  2. 识别已更新的字段
  3. 针对已更新的字段创建特定于应用程序的审计事件。

例如,如果某个项目有一个名为“状态”的字段,并且它从“写入”更改为“审核”,我应该在变更流中捕获它,然后创建一个名为“状态变更”的日志并将其保存在不同的集合中。

目前,这是我编写的代码:

using Microsoft.Extensions.Options;
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Driver;
using TodoApp.Configurations;

namespace TodoApp.Background
{
    public class Service : IHostedService
    {
        private readonly IMongoCollection<BsonDocument> _mongoCollection;
        private IChangeStreamCursor<BsonDocument> _changeStreamCursor;
        private Task _watchTask;
        private CancellationTokenSource _cancellationTokenSource;

        public Service(IMongoClient mongoClient, IOptions<Database> options)
        {
            var database = mongoClient.GetDatabase(options.Value.DatabaseName);
            _mongoCollection = database.GetCollection<BsonDocument>(options.Value.CollectionName);
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            _cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
            _watchTask = Task.Run(() => WatchForChangesAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token);

            return Task.CompletedTask;
        }

        private async Task WatchForChangesAsync(CancellationToken cancellationToken)
        {
            var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
    .Match(change =>
        change.OperationType == ChangeStreamOperationType.Insert ||
        change.OperationType == ChangeStreamOperationType.Update ||
        change.OperationType == ChangeStreamOperationType.Replace
    )
    .AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>(
        @"{ 
            $project: { 
                '_id': 1, 
                'fullDocument': 1, 
                'ns': 1, 
                'documentKey': 1
            }
        }"
    );

            ChangeStreamOptions options = new()
            {
                FullDocument = ChangeStreamFullDocumentOption.UpdateLookup
            };

            _changeStreamCursor = _mongoCollection.Watch(
    pipeline,
    options
);

            Console.WriteLine("Watching for changes...");
            while (await _changeStreamCursor.MoveNextAsync(cancellationToken))
            {
                var batch = _changeStreamCursor.Current;
                foreach (var change in batch)
                {
                    // var serializedChange = change.ToJson();
                    // var document = BsonSerializer.Deserialize<Domain.ChangeStream<BsonDocument>>(serializedChange);
                    Console.WriteLine(change);
                    // Process the change
                }
            }
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            _cancellationTokenSource.Cancel();
            return Task.WhenAny(_watchTask, Task.Delay(Timeout.Infinite, cancellationToken));
        }
    }
}

这些事件与 MongoDB Cloud 上的更新描述完美配合,但当我将其切换到 Azure Cosmos 时,它不支持它。还有其他替代方案吗?

帖子版权声明 1、本帖标题:如何捕获在 Azure Cosmos 上运行的 MongoDB 上的变化流?
    本站网址:http://xjnalaquan.com/
2、本网站的资源部分来源于网络,如有侵权,请联系站长进行删除处理。
3、会员发帖仅代表会员个人观点,并不代表本站赞同其观点和对其真实性负责。
4、本站一律禁止以任何方式发布或转载任何违法的相关信息,访客发现请向站长举报
5、站长邮箱:yeweds@126.com 除非注明,本帖由Hamish在本站《azure》版块原创发布, 转载请注明出处!
最新回复 (0)
  • 假设我有三个不同的词典:dictA = {'A': 1, 'B': 2, 'C': 3}dictB = {'C': 1, 'D': 2, 'E': 3}dictC = {'A': 2, 'C': 4, 'E': 6, 'G': 8}并且我想要'添加'这些词典...

    假设我有三本不同的词典:

    dictA = {'A': 1, 'B': 2, 'C': 3}
    dictB = {'C': 1, 'D': 2, 'E': 3}
    dictC = {'A': 2, 'C': 4, 'E': 6, 'G': 8}
    

    我想按顺序将这些词典“添加”在一起 dictA+dictB+dictC 。我的意思是:

    • p3

    • p4

    此示例的结果应如下所示:

    resultDict = {'A': 3, 'B': 2, 'C': 8, 'D': 2, 'E': 9, 'G': 2,}
    

    有没有简单的方法可以做到这一点?我实际使用的字典要大得多,并且嵌套在多个其他字典中,所以如果这个例子解释得不好,我很抱歉。我尝试摆弄 for 循环,但正如我提到的,我实际使用的列表要大得多,而且不那么容易处理。

返回
作者最近主题: