8wDlpd.png
8wDFp9.png
8wDEOx.png
8wDMfH.png
8wDKte.png

如何捕获在 Azure Cosmos 上运行的 MongoDB 上的变化流?

Hamish 3月前

138 0

目前,我有一个用例如下:捕获集合中数据发生的所有事件识别已更新的字段创建特定于应用程序的审计事件......

目前,我有一个用例如下:

  1. 捕获集合内数据发生的所有事件
  2. 识别已更新的字段
  3. 针对已更新的字段创建特定于应用程序的审计事件。

例如,如果某个项目有一个名为“状态”的字段,并且它从“写入”更改为“审核”,我应该在变更流中捕获它,然后创建一个名为“状态变更”的日志并将其保存在不同的集合中。

目前,这是我编写的代码:

using Microsoft.Extensions.Options;
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Driver;
using TodoApp.Configurations;

namespace TodoApp.Background
{
    public class Service : IHostedService
    {
        private readonly IMongoCollection<BsonDocument> _mongoCollection;
        private IChangeStreamCursor<BsonDocument> _changeStreamCursor;
        private Task _watchTask;
        private CancellationTokenSource _cancellationTokenSource;

        public Service(IMongoClient mongoClient, IOptions<Database> options)
        {
            var database = mongoClient.GetDatabase(options.Value.DatabaseName);
            _mongoCollection = database.GetCollection<BsonDocument>(options.Value.CollectionName);
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            _cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
            _watchTask = Task.Run(() => WatchForChangesAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token);

            return Task.CompletedTask;
        }

        private async Task WatchForChangesAsync(CancellationToken cancellationToken)
        {
            var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
    .Match(change =>
        change.OperationType == ChangeStreamOperationType.Insert ||
        change.OperationType == ChangeStreamOperationType.Update ||
        change.OperationType == ChangeStreamOperationType.Replace
    )
    .AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>(
        @"{ 
            $project: { 
                '_id': 1, 
                'fullDocument': 1, 
                'ns': 1, 
                'documentKey': 1
            }
        }"
    );

            ChangeStreamOptions options = new()
            {
                FullDocument = ChangeStreamFullDocumentOption.UpdateLookup
            };

            _changeStreamCursor = _mongoCollection.Watch(
    pipeline,
    options
);

            Console.WriteLine("Watching for changes...");
            while (await _changeStreamCursor.MoveNextAsync(cancellationToken))
            {
                var batch = _changeStreamCursor.Current;
                foreach (var change in batch)
                {
                    // var serializedChange = change.ToJson();
                    // var document = BsonSerializer.Deserialize<Domain.ChangeStream<BsonDocument>>(serializedChange);
                    Console.WriteLine(change);
                    // Process the change
                }
            }
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            _cancellationTokenSource.Cancel();
            return Task.WhenAny(_watchTask, Task.Delay(Timeout.Infinite, cancellationToken));
        }
    }
}

这些事件与 MongoDB Cloud 上的更新描述完美配合,但当我将其切换到 Azure Cosmos 时,它不支持它。还有其他替代方案吗?

帖子版权声明 1、本帖标题:如何捕获在 Azure Cosmos 上运行的 MongoDB 上的变化流?
    本站网址:http://xjnalaquan.com/
2、本网站的资源部分来源于网络,如有侵权,请联系站长进行删除处理。
3、会员发帖仅代表会员个人观点,并不代表本站赞同其观点和对其真实性负责。
4、本站一律禁止以任何方式发布或转载任何违法的相关信息,访客发现请向站长举报
5、站长邮箱:yeweds@126.com 除非注明,本帖由Hamish在本站《azure》版块原创发布, 转载请注明出处!
最新回复 (0)
  • 有人能帮我找出代码中的错误吗?如果我创建一个具有静态初始值的空白二维矩阵,它会返回正确的转置矩阵。而如果我使用 for l 创建一个空白矩阵...

    有人能帮我找出代码中的错误吗?如果我创建一个具有静态初始值的空白二维矩阵,它会返回正确的转置矩阵。而如果我使用 for 循环创建一个空白矩阵,它会返回答案中所有行的最后一行。

    # Original Matrix :
    [[1, 2, 3], 
    [4, 5, 6], 
    [7, 8, 9]]
    
    # Expected Transpose :
    [[1, 4, 7],
    [2, 5, 8],
    [3, 6, 9]]
    
    # Actual Transpose :
    [[3, 6, 9],
    [3, 6, 9],
    [3, 6, 9]]
    
    
    # Transpose Matrix
    
    def transpose_mtx(original):
        t_cols = len(original)
        t_rows = len(original[0])
    
        # creating a blank matrix with 0s of transpose shape
        row = [0 for _ in range(t_cols)]
        t_mtx = [row for _ in range(t_rows)]
    
        # t_mtx = [[0,0,0],[0,0,0], [0,0,0]]
        # if I keep this line instead, it returns correct answer
     
        # rows of original
        for i in range(len(original)):
            # columns of original
            for j in range(len(original[0])):
                # interchange items
                t_mtx[j][i] = original[i][j]
        
        return t_mtx
    
    
    my_mtx = [[1,2, 3],
              [4, 5, 6],
              [7, 8, 9]]
    
    print(transpose_mtx(my_mtx))
    
返回
作者最近主题: