8wDlpd.png
8wDFp9.png
8wDEOx.png
8wDMfH.png
8wDKte.png

如何捕获在 Azure Cosmos 上运行的 MongoDB 上的变化流?

Hamish 3月前

134 0

目前,我有一个用例如下:捕获集合中数据发生的所有事件识别已更新的字段创建特定于应用程序的审计事件......

目前,我有一个用例如下:

  1. 捕获集合内数据发生的所有事件
  2. 识别已更新的字段
  3. 针对已更新的字段创建特定于应用程序的审计事件。

例如,如果某个项目有一个名为“状态”的字段,并且它从“写入”更改为“审核”,我应该在变更流中捕获它,然后创建一个名为“状态变更”的日志并将其保存在不同的集合中。

目前,这是我编写的代码:

using Microsoft.Extensions.Options;
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Driver;
using TodoApp.Configurations;

namespace TodoApp.Background
{
    public class Service : IHostedService
    {
        private readonly IMongoCollection<BsonDocument> _mongoCollection;
        private IChangeStreamCursor<BsonDocument> _changeStreamCursor;
        private Task _watchTask;
        private CancellationTokenSource _cancellationTokenSource;

        public Service(IMongoClient mongoClient, IOptions<Database> options)
        {
            var database = mongoClient.GetDatabase(options.Value.DatabaseName);
            _mongoCollection = database.GetCollection<BsonDocument>(options.Value.CollectionName);
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            _cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
            _watchTask = Task.Run(() => WatchForChangesAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token);

            return Task.CompletedTask;
        }

        private async Task WatchForChangesAsync(CancellationToken cancellationToken)
        {
            var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
    .Match(change =>
        change.OperationType == ChangeStreamOperationType.Insert ||
        change.OperationType == ChangeStreamOperationType.Update ||
        change.OperationType == ChangeStreamOperationType.Replace
    )
    .AppendStage<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>, BsonDocument>(
        @"{ 
            $project: { 
                '_id': 1, 
                'fullDocument': 1, 
                'ns': 1, 
                'documentKey': 1
            }
        }"
    );

            ChangeStreamOptions options = new()
            {
                FullDocument = ChangeStreamFullDocumentOption.UpdateLookup
            };

            _changeStreamCursor = _mongoCollection.Watch(
    pipeline,
    options
);

            Console.WriteLine("Watching for changes...");
            while (await _changeStreamCursor.MoveNextAsync(cancellationToken))
            {
                var batch = _changeStreamCursor.Current;
                foreach (var change in batch)
                {
                    // var serializedChange = change.ToJson();
                    // var document = BsonSerializer.Deserialize<Domain.ChangeStream<BsonDocument>>(serializedChange);
                    Console.WriteLine(change);
                    // Process the change
                }
            }
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            _cancellationTokenSource.Cancel();
            return Task.WhenAny(_watchTask, Task.Delay(Timeout.Infinite, cancellationToken));
        }
    }
}

这些事件与 MongoDB Cloud 上的更新描述完美配合,但当我将其切换到 Azure Cosmos 时,它不支持它。还有其他替代方案吗?

帖子版权声明 1、本帖标题:如何捕获在 Azure Cosmos 上运行的 MongoDB 上的变化流?
    本站网址:http://xjnalaquan.com/
2、本网站的资源部分来源于网络,如有侵权,请联系站长进行删除处理。
3、会员发帖仅代表会员个人观点,并不代表本站赞同其观点和对其真实性负责。
4、本站一律禁止以任何方式发布或转载任何违法的相关信息,访客发现请向站长举报
5、站长邮箱:yeweds@126.com 除非注明,本帖由Hamish在本站《azure》版块原创发布, 转载请注明出处!
最新回复 (0)
  • 有人能帮我找出代码中的错误吗?如果我创建一个具有静态初始值的空白二维矩阵,它会返回正确的转置矩阵。而如果我使用 for l 创建一个空白矩阵...

    有人能帮我找出代码中的错误吗?如果我创建一个具有静态初始值的空白二维矩阵,它会返回正确的转置矩阵。而如果我使用 for 循环创建一个空白矩阵,它会返回答案中所有行的最后一行。

    # Original Matrix :
    [[1, 2, 3], 
    [4, 5, 6], 
    [7, 8, 9]]
    
    # Expected Transpose :
    [[1, 4, 7],
    [2, 5, 8],
    [3, 6, 9]]
    
    # Actual Transpose :
    [[3, 6, 9],
    [3, 6, 9],
    [3, 6, 9]]
    
    
    # Transpose Matrix
    
    def transpose_mtx(original):
        t_cols = len(original)
        t_rows = len(original[0])
    
        # creating a blank matrix with 0s of transpose shape
        row = [0 for _ in range(t_cols)]
        t_mtx = [row for _ in range(t_rows)]
    
        # t_mtx = [[0,0,0],[0,0,0], [0,0,0]]
        # if I keep this line instead, it returns correct answer
     
        # rows of original
        for i in range(len(original)):
            # columns of original
            for j in range(len(original[0])):
                # interchange items
                t_mtx[j][i] = original[i][j]
        
        return t_mtx
    
    
    my_mtx = [[1,2, 3],
              [4, 5, 6],
              [7, 8, 9]]
    
    print(transpose_mtx(my_mtx))
    
  • 假设我有三个不同的词典:dictA = {'A': 1, 'B': 2, 'C': 3}dictB = {'C': 1, 'D': 2, 'E': 3}dictC = {'A': 2, 'C': 4, 'E': 6, 'G': 8}并且我想要'添加'这些词典...

    假设我有三本不同的词典:

    dictA = {'A': 1, 'B': 2, 'C': 3}
    dictB = {'C': 1, 'D': 2, 'E': 3}
    dictC = {'A': 2, 'C': 4, 'E': 6, 'G': 8}
    

    我想按顺序将这些词典“添加”在一起 dictA+dictB+dictC 。我的意思是:

    • p3

    • p4

    此示例的结果应如下所示:

    resultDict = {'A': 3, 'B': 2, 'C': 8, 'D': 2, 'E': 9, 'G': 2,}
    

    有没有简单的方法可以做到这一点?我实际使用的字典要大得多,并且嵌套在多个其他字典中,所以如果这个例子解释得不好,我很抱歉。我尝试摆弄 for 循环,但正如我提到的,我实际使用的列表要大得多,而且不那么容易处理。

  • 我使用语音工作室,并希望将头像与 OpenAI 语音相结合(语音菜单中“英语(美国)”的最后六种语音,从 Alloy 到 Shimmer)。遗憾的是手势不起作用...

    我使用语音工作室,并希望将头像与 OpenAI 语音相结合(语音菜单中“英语(美国)”的最后六种语音,从 Alloy 到 Shimmer)。遗憾的是,手势不适用于它们,只适用于非 OpenAI 语音。头像只是忽略手势标签。如何解决这个问题?OpenAI 语音更好,与手势头像结合将是一个很好的组合,特别是因为它们也适用于许多非英语语言(您需要将您的资源位于“瑞典中部”才能使用头像和 OpenAI 语音)。提前谢谢您!

  • 主要问题在标题中,但几乎没有信息讨论 Terraform 注册 AzureRM 资源提供程序的过程。所以我将包含一个小背景...

    主要问题在标题中,但几乎没有信息讨论 Terraform 注册 AzureRM 资源提供程序的过程。因此,我将介绍一个小背景故事,说明这个问题与其他问题有何关联:

    昨天,我们遇到了一个问题,微软显然删除了资源提供商 Microsoft.TimeSeriesInsight 。我们仍然可以在较旧的订阅中看到该提供商并注册/取消注册它,但对于新订阅,它没有显示在资源提供商列表中。

    为便于理解,我们需要知道我们使用了两个管道。第一个管道设置 Azure 订阅并使用所有者级 SPN。第二个订阅部署具有网络贡献者权限的网络基础架构。两个管道都使用了相当老旧的 azurerm terraform 提供程序,版本为 3.67.0 .

    网络管道也是失败的管道,并显示以下错误消息:

    Original Error: determining which Required Resource Providers require registration: the required Resource Provider "Microsoft.TimeSeriesInsights" wasn't returned from the Azure API
    

    对我来说,这只是说“我们删除了那个假的,更新你的 azurerm 提供程序”!虽然可以 专门创建一个 terraform 资源来注册一个 azurerm 资源提供程序 ,但该网站还直接用 粗体 字指出,你应该让 terraform 处理资源提供程序注册。

    因此,我们总共有一个较旧的 azurerm terraform 提供程序版本,它定义了已弃用的 azurerm 资源 提供程序。由于 terraform 始终向其可能需要的 Azure 订阅注册 所有 资源提供程序,因此我认为更新提供程序可以解决问题。

    但是,将 azurerm terraform 提供程序 3.67.0 3.108.0 ,它需要注册新的 azurerm 资源 提供程序,而这些提供程序之前从未注册过。

    简单地运行 subscription-setup 管道(无论是空应用还是实际应用某些内容)都没有帮助 - Terraform 没有向 Subscription 注册新的 资源 提供者。然后网络管道失败,因为它需要具有所有者或贡献者权限才能添加资源提供者注册...

    最后我们不得不手动完成这一操作,但这个问题一直困扰着我:

    Terraform 何时实际进行提供商注册?为什么管道运行没有更新注册?

  • 使用您展示的格式的一个简单方法是收集所有键,然后 .get() 每个键的默认值为 0,并将其添加到新字典中。

    以下是显示其实际运行的基本结构:

    dictA = {'A': 1, 'B': 2, 'C': 3}
    dictB = {'C': 1, 'D': 2, 'E': 3}
    dictC = {'A': 2, 'C': 4, 'E': 6, 'G': 8}
    dict_list = [dictA, dictB, dictC]
    
    # Collect all the keys from each dict
    all_keys = set()
    for dictx in dict_list:
        all_keys.update(dictx.keys())
    
    # Iterate through each key adding it to the result
    resultDict = {}
    for key in all_keys:
        # Pre-init with a 0 then grab it or default to 0 from each dict
        resultDict[key] = 0
        for dictx in dict_list:
            resultDict[key] += dictx.get(key, 0)
    
    print(resultDict)
    

    以下是浓缩为一行的示例代码:

    resultDict = {key: sum([x.get(key, 0) for x in dict_list]) for key in set().union(*(d.keys() for d in dict_list))}
    

    主要受到 这个答案的 .

    如果您有任何疑问,请告诉我。

  • 非常感谢!成功了。所以基本上,问题在于所有行都再次使用相同的列表对象 \'row\',因为列表是可变的,所以进一步的更改会渗透到各处。

  • 您需要通过为转置矩阵中的每一行创建单独的列表来初始化一个空白矩阵。

    # Creating a blank matrix with 0s of transpose shape
    t_mtx = [[0 for _ in range(t_cols)] for _ in range(t_rows)]
    
  • 明白了!非常感谢。它成功了。我费尽心思手动检查整个内循环以找出问题所在,浪费了 2 个小时。但最终,您的回答有所帮助。所以基本上,问题在于所有行都再次使用相同的列表对象 \'row\',因为列表是可变的,所以进一步的更改会渗透到各处。再次感谢!

  • collections Counter 使这变得非常简单,因为您可以将 Counter 对象添加在一起。换句话说,这可以完成您想要的操作:

    Counter(dictA) + Counter(dictB) + Counter(dictC)
    

    如果要概括,可以使用 sum() 并为启动参数提供一个空的计数器,这样就非常简洁了:

    from collections import Counter
    
    dictA = {'A': 1, 'B': 2, 'C': 3}
    dictB = {'C': 1, 'D': 2, 'E': 3}
    dictC = {'A': 2, 'C': 4, 'E': 6, 'G': 8}
    
    dicts = [dictA, dictB, dictC]
    
    counts = sum(map(Counter, dicts), Counter())
    # Counter({'E': 9, 'C': 8, 'G': 8, 'A': 3, 'B': 2, 'D': 2})
    

    计数器对象的作用类似于字典,但如果你特别需要字典,你可以使用 dict(counts) .

  • 问题在于您正在使用列表作为列表的一个元素(因为它应该在矩阵中),但这里的问题是您将所有行用作同一个列表(即 row ),并且如果您知道列表是可变的意味着它们可以从任何地方更新并且它会根据对它的所有引用而改变。

    现在在这种情况下,t_mtx 使用所有行作为相同的参考列表( row ),这意味着每次更新任何行时它都会更新所有行,这就是为什么所有行都与最后一行相同的原因。

    因此,您可以每次创建一个新行,或者使用 .copy() 方法创建行列表的新副本。

    t_mtx = [row.copy() for _ in range(t_rows)]
    
    

    输出

    enter image description here

    希望你能理解,抱歉我的英语不好。

  • 遍历字典 B 中的键。如果该键存在于字典 A 中,则将 B 值添加到 A 键,否则在字典 A 中创建新键。这里的实际难度是什么?

  • 明白了!非常感谢。成功了。将相同的列表对象 \'row\' 更改为行的列表单独副本。

  • 您正在创建一个 List t_mtx,它有 3 个相同子列表的副本。因此更改会传播。顺便说一句 - 您应该使用 Numpy 数组作为矩阵,而不是 Python 列表的列表,然后可以直接转置。

  • @RuiJarimba 两点都是正确的。正如我在帖子中所说,这个问题是通过手动注册提供程序来解决的。此外,我们项目的范围相当广泛:我没有将所有 Terraform 提供程序更新到最新版本的访问权限(我的项目使用的是 3.108.0)。我只是被要求修复它的人,指导拥有更多权限的人去做这些事情 :/

  • 引用 16

    @dododo 这不是“一个坏主意” - 它可能是一种特定的不兼容性(并且所有不兼容性都有记录,甚至具体到特定的兼容版本)但 OP 没有提供足够的细节(如果他们使用 MongoDB API 运行,他们还没有透露兼容版本,或者除了提到例外之外的任何其他细节)

  • 使用 mongoapi 连接到 cosmosdb 不是一个好主意,因为 mongo 驱动程序根本不了解 cosmosdb。因此,即使可能支持更改流,其功能也非常有限

  • 发布链接没有帮助(没有人需要访问外部页面才能理解您的问题):我不知道您在该页面上具体指的是什么。就像我说的:

  • 请编辑您的问题以添加一些说明和细节:1) 当您说“Azure Cosmos”时 - 您是否在运行 MongoDB API?MongoDB vCore?其他的?2) 您说的“不支持它”是什么意思 - 错误消息?没有返回数据?返回了错误的数据?目前,关于这个问题确实没有具体的细节。

返回
作者最近主题: