8wDlpd.png
8wDFp9.png
8wDEOx.png
8wDMfH.png
8wDKte.png

如何捕获在 Azure Cosmos 上运行的 MongoDB 上的变化流?

Hamish 3月前

139 0

目前,我有一个用例如下:捕获集合中数据发生的所有事件识别已更新的字段创建特定于应用程序的审计事件......

目前,我有一个用例如下:

  1. 捕获集合内数据发生的所有事件
  2. 识别已更新的字段
  3. 针对已更新的字段创建特定于应用程序的审计事件。

例如,如果某个项目有一个名为“状态”的字段,并且它从“写入”更改为“审核”,我应该在变更流中捕获它,然后创建一个名为“状态变更”的日志并将其保存在不同的集合中。

目前,这是我编写的代码:

using Microsoft.Extensions.Options;
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Driver;
using TodoApp.Configurations;

namespace TodoApp.Background
{
    public class Service : IHostedService
    {
        private readonly IMongoCollection<BsonDocument> _mongoCollection;
        private IChangeStreamCursor<BsonDocument> _changeStreamCursor;
        private Task _watchTask;
        private CancellationTokenSource _cancellationTokenSource;

        public Service(IMongoClient mongoClient, IOptions<Database> options)
        {
            var database = mongoClient.GetDatabase(options.Value.DatabaseName);
            _mongoCollection = database.GetCollection<BsonDocument>(options.Value.CollectionName);
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            _cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
            _watchTask = Task.Run(() => WatchForChangesAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token);

            return Task.CompletedTask;
        }

        private async Task WatchForChangesAsync(CancellationToken cancellationToken)
        {
            var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
    .Match(change =>
        change.OperationType == ChangeStreamOperationType.Insert ||
        change.OperationType == ChangeStreamOperationType.Update ||
        change.OperationType == ChangeStreamOperationType.Replace
    )
    .AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>(
        @"{ 
            $project: { 
                '_id': 1, 
                'fullDocument': 1, 
                'ns': 1, 
                'documentKey': 1
            }
        }"
    );

            ChangeStreamOptions options = new()
            {
                FullDocument = ChangeStreamFullDocumentOption.UpdateLookup
            };

            _changeStreamCursor = _mongoCollection.Watch(
    pipeline,
    options
);

            Console.WriteLine("Watching for changes...");
            while (await _changeStreamCursor.MoveNextAsync(cancellationToken))
            {
                var batch = _changeStreamCursor.Current;
                foreach (var change in batch)
                {
                    // var serializedChange = change.ToJson();
                    // var document = BsonSerializer.Deserialize<Domain.ChangeStream<BsonDocument>>(serializedChange);
                    Console.WriteLine(change);
                    // Process the change
                }
            }
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            _cancellationTokenSource.Cancel();
            return Task.WhenAny(_watchTask, Task.Delay(Timeout.Infinite, cancellationToken));
        }
    }
}

这些事件与 MongoDB Cloud 上的更新描述完美配合,但当我将其切换到 Azure Cosmos 时,它不支持它。还有其他替代方案吗?

帖子版权声明 1、本帖标题:如何捕获在 Azure Cosmos 上运行的 MongoDB 上的变化流?
    本站网址:http://xjnalaquan.com/
2、本网站的资源部分来源于网络,如有侵权,请联系站长进行删除处理。
3、会员发帖仅代表会员个人观点,并不代表本站赞同其观点和对其真实性负责。
4、本站一律禁止以任何方式发布或转载任何违法的相关信息,访客发现请向站长举报
5、站长邮箱:yeweds@126.com 除非注明,本帖由Hamish在本站《azure》版块原创发布, 转载请注明出处!
最新回复 (0)
  • 主要问题在标题中,但几乎没有信息讨论 Terraform 注册 AzureRM 资源提供程序的过程。所以我将包含一个小背景...

    主要问题在标题中,但几乎没有信息讨论 Terraform 注册 AzureRM 资源提供程序的过程。因此,我将介绍一个小背景故事,说明这个问题与其他问题有何关联:

    昨天,我们遇到了一个问题,微软显然删除了资源提供商 Microsoft.TimeSeriesInsight 。我们仍然可以在较旧的订阅中看到该提供商并注册/取消注册它,但对于新订阅,它没有显示在资源提供商列表中。

    为便于理解,我们需要知道我们使用了两个管道。第一个管道设置 Azure 订阅并使用所有者级 SPN。第二个订阅部署具有网络贡献者权限的网络基础架构。两个管道都使用了相当老旧的 azurerm terraform 提供程序,版本为 3.67.0 .

    网络管道也是失败的管道,并显示以下错误消息:

    Original Error: determining which Required Resource Providers require registration: the required Resource Provider "Microsoft.TimeSeriesInsights" wasn't returned from the Azure API
    

    对我来说,这只是说“我们删除了那个假的,更新你的 azurerm 提供程序”!虽然可以 专门创建一个 terraform 资源来注册一个 azurerm 资源提供程序 ,但该网站还直接用 粗体 字指出,你应该让 terraform 处理资源提供程序注册。

    因此,我们总共有一个较旧的 azurerm terraform 提供程序版本,它定义了已弃用的 azurerm 资源 提供程序。由于 terraform 始终向其可能需要的 Azure 订阅注册 所有 资源提供程序,因此我认为更新提供程序可以解决问题。

    但是,将 azurerm terraform 提供程序 3.67.0 3.108.0 ,它需要注册新的 azurerm 资源 提供程序,而这些提供程序之前从未注册过。

    简单地运行 subscription-setup 管道(无论是空应用还是实际应用某些内容)都没有帮助 - Terraform 没有向 Subscription 注册新的 资源 提供者。然后网络管道失败,因为它需要具有所有者或贡献者权限才能添加资源提供者注册...

    最后我们不得不手动完成这一操作,但这个问题一直困扰着我:

    Terraform 何时实际进行提供商注册?为什么管道运行没有更新注册?

返回
作者最近主题: