8wDlpd.png
8wDFp9.png
8wDEOx.png
8wDMfH.png
8wDKte.png

如何捕获在 Azure Cosmos 上运行的 MongoDB 上的变化流?

Hamish 3月前

130 0

目前,我有一个用例如下:捕获集合中数据发生的所有事件识别已更新的字段创建特定于应用程序的审计事件......

目前,我有一个用例如下:

  1. 捕获集合内数据发生的所有事件
  2. 识别已更新的字段
  3. 针对已更新的字段创建特定于应用程序的审计事件。

例如,如果某个项目有一个名为“状态”的字段,并且它从“写入”更改为“审核”,我应该在变更流中捕获它,然后创建一个名为“状态变更”的日志并将其保存在不同的集合中。

目前,这是我编写的代码:

using Microsoft.Extensions.Options;
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Driver;
using TodoApp.Configurations;

namespace TodoApp.Background
{
    public class Service : IHostedService
    {
        private readonly IMongoCollection<BsonDocument> _mongoCollection;
        private IChangeStreamCursor<BsonDocument> _changeStreamCursor;
        private Task _watchTask;
        private CancellationTokenSource _cancellationTokenSource;

        public Service(IMongoClient mongoClient, IOptions<Database> options)
        {
            var database = mongoClient.GetDatabase(options.Value.DatabaseName);
            _mongoCollection = database.GetCollection<BsonDocument>(options.Value.CollectionName);
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            _cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
            _watchTask = Task.Run(() => WatchForChangesAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token);

            return Task.CompletedTask;
        }

        private async Task WatchForChangesAsync(CancellationToken cancellationToken)
        {
            var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
    .Match(change =>
        change.OperationType == ChangeStreamOperationType.Insert ||
        change.OperationType == ChangeStreamOperationType.Update ||
        change.OperationType == ChangeStreamOperationType.Replace
    )
    .AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>(
        @"{ 
            $project: { 
                '_id': 1, 
                'fullDocument': 1, 
                'ns': 1, 
                'documentKey': 1
            }
        }"
    );

            ChangeStreamOptions options = new()
            {
                FullDocument = ChangeStreamFullDocumentOption.UpdateLookup
            };

            _changeStreamCursor = _mongoCollection.Watch(
    pipeline,
    options
);

            Console.WriteLine("Watching for changes...");
            while (await _changeStreamCursor.MoveNextAsync(cancellationToken))
            {
                var batch = _changeStreamCursor.Current;
                foreach (var change in batch)
                {
                    // var serializedChange = change.ToJson();
                    // var document = BsonSerializer.Deserialize<Domain.ChangeStream<BsonDocument>>(serializedChange);
                    Console.WriteLine(change);
                    // Process the change
                }
            }
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            _cancellationTokenSource.Cancel();
            return Task.WhenAny(_watchTask, Task.Delay(Timeout.Infinite, cancellationToken));
        }
    }
}

这些事件与 MongoDB Cloud 上的更新描述完美配合,但当我将其切换到 Azure Cosmos 时,它不支持它。还有其他替代方案吗?

帖子版权声明 1、本帖标题:如何捕获在 Azure Cosmos 上运行的 MongoDB 上的变化流?
    本站网址:http://xjnalaquan.com/
2、本网站的资源部分来源于网络,如有侵权,请联系站长进行删除处理。
3、会员发帖仅代表会员个人观点,并不代表本站赞同其观点和对其真实性负责。
4、本站一律禁止以任何方式发布或转载任何违法的相关信息,访客发现请向站长举报
5、站长邮箱:yeweds@126.com 除非注明,本帖由Hamish在本站《azure》版块原创发布, 转载请注明出处!
最新回复 (0)
  • 遍历字典 B 中的键。如果该键存在于字典 A 中,则将 B 值添加到 A 键,否则在字典 A 中创建新键。这里的实际难度是什么?

返回
作者最近主题: