8wDlpd.png
8wDFp9.png
8wDEOx.png
8wDMfH.png
8wDKte.png

Python 警告:协同程序‘create_task’未被等待

Dou Xu-MSFT 2月前

51 0

我正在使用 Python 和 PyCharm,我试图使用 asyncio 卸载数据库写入,但在 IDE 警告方面遇到了麻烦。在我的 main.py 中,我以这种方式启动任务db_logging_monitoring_task =

我正在使用 Python 和 PyCharm,我试图卸载数据库写入, asyncio 但遇到了 IDE 警告问题

在我的 main.py 中,我以这种方式启动任务

db_logging_monitoring_task = asyncio.create_task(start_logging_jobs_sniffer())
tasks.append(db_logging_monitoring_task)
await gather(*tasks)

异步函数内部的 while 循环在 start_logging_jobs_sniffer 哪里

# Waits on jobs from a jobs_queue
async def start_logging_jobs_sniffer(self):
    while True:
        try:
            job = await self.logging_jobs_queue.get()


# Adds jobs to a jobs_queue
async def make_logging_job(self, params):
    job = self.LoggingJob(params)
    await self.logging_jobs_queue.put(job)

类似地,在 main.py 中,我有另一个后台监控任务,该任务以 5 秒为间隔进行轮询,然后记录轮询结果

async def poll(self, interval: int):
    if self.is_config_applied:
        while True:
            # ...

            asyncio.create_task(make_logging_job(
                params={
                    **self.polling_results
                },
            ))

但 pycharm 向我发出警告

Coroutine 'create_task' is not awaited 

我特别希望 make_logging_job 任务能够与其他任务并行工作,这就是我不使用的原因 await ,而且这似乎按预期工作。

如果我在出现警告的情况下运行程序,则它们 make_logging_job 似乎是并行的,并且不会阻止任何内容

我完全不明白为什么 PyCharm 会发出未等待的警告 create_task ,还有其他更好的方法来完成任务吗?

将任务保存到变量可以抑制错误,并且似乎通过在后台运行来发挥作用

            test = asyncio.create_task(make_logging_job(
                params={
                    **self.polling_results
                },
            ))

但我不知道这有什么用,我认为将任务保存到变量不会等待协程

帖子版权声明 1、本帖标题:Python 警告:协同程序‘create_task’未被等待
    本站网址:http://xjnalaquan.com/
2、本网站的资源部分来源于网络,如有侵权,请联系站长进行删除处理。
3、会员发帖仅代表会员个人观点,并不代表本站赞同其观点和对其真实性负责。
4、本站一律禁止以任何方式发布或转载任何违法的相关信息,访客发现请向站长举报
5、站长邮箱:yeweds@126.com 除非注明,本帖由Dou Xu-MSFT在本站《python》版块原创发布, 转载请注明出处!
最新回复 (0)
  • 总体而言:我尝试使用 Cloudbuild 和 Cloudrun 构建 BERT 模型。我将模型(参数)和元数据(标签)保存在 GCP Cloud Storage 中。但是,我遇到了加载元数据.bin 的错误...

    总体而言: 我尝试使用 Cloudbuild 和 Cloudrun 。我将模型(参数)和元数据(标签)保存在 GCP 云存储中。但是,我在通过 joblib.load() 加载 metadata.bin 文件时遇到了错误。 我的 metadata.bin 文件包含 UTF-8 字符,但 joblib.load 需要 ASCII 字符。在我的版本中,默认协议是 4,但错误消息表明协议是 0。

    相关依赖项: python 3.8.0、joblib 1.1.1(我已经尝试升级最近的版本)、google-api-core==2.19.1、google-auth==2.32.0、google-cloud-core==2.4.1、google-cloud-storage==2.18.0

    我的努力: 我已经审理了两起案件。

    1. 本地 。在这种情况下, 在 GCP Cloud Storage 下载 model.bin 和 metadata.bin 文件都可以 .
    2. 上尝试过 docker 。在这种情况下, 在 dockerized 容器中加载 metadata.bin 文件和 model.bin 文件也有效 .

    错误详情
    `

      File "./src_review/model_server.py", line 70, in load_bert_model
          metadata = joblib.load(metadata_path)
      File "/usr/local/lib/python3.8/site-packages/joblib/numpy_pickle.py", line 658, in load
          obj = _unpickle(fobj, filename, mmap_mode)
      File "/usr/local/lib/python3.8/site-packages/joblib/numpy_pickle.py", line 577, in _unpickle
          obj = unpickler.load()
      File "/usr/local/lib/python3.8/pickle.py", line 1210, in load
          dispatch[key[0]](self)
      File "/usr/local/lib/python3.8/pickle.py", line 1244, in load_persid
          raise UnpicklingError(
              _pickle.UnpicklingError: persistent IDs in protocol 0 must be ASCII strings`
    

    我的代码: `

     def load_bert_model(config: argparse.Namespace):
          bucket = storage_client.bucket(bucket_name)
          model_blob = bucket.blob(model_file)
          metadata_blob = bucket.blob(metadata_file)        
    
          local_model_path = '/tmp/pytorch_model.bin'
          metadata_path = '/tmp/meta.bin'    
    
          print(f"Downloading model to {local_model_path}")
    
          model_blob.download_to_filename(local_model_path)        
          log.info(f"Model downloaded to {local_model_path}")
          metadata_blob.download_to_filename(metadata_path)
          log.info(f"Metadata (label) downloaded to {metadata_path}")
    
          metadata = joblib.load(metadata_path)
        ...`
    

    来自GCP官方文档

    `

    def upload_directory_with_transfer_manager(bucket_name, source_directory, workers=1):
        bucket = create_bucket_if_not_exists(bucket_name)
        directory_as_path_obj = Path(source_directory)
        paths = directory_as_path_obj.rglob("*.bin")
    
        file_paths = [path for path in paths if path.is_file()]
    
        relative_paths = [path.relative_to(source_directory) for path in file_paths]
    
        string_paths = [str(path) for path in relative_paths]
    
        print("Found {} files.".format(len(string_paths)))
    
        results = transfer_manager.upload_many_from_filenames(
            bucket, string_paths, source_directory=source_directory, max_workers=workers, skip_if_exists=False
        )
    
        for name, result in zip(string_paths, results):
            if isinstance(result, Exception):
                print("Failed to upload {} due to exception: {}".format(name, result))
            else:
                print("Uploaded {} to {}.".format(name, bucket.name))
    

    `

    预期原因: 我认为 cloud run config 与我的测试环境有很大不同。但我无法预期主要原因。

    谢谢你的努力!

  • @Mike'Pomax'Kamermans 这个家伙不使用 create_task 问题是不同的

  • 当您创建一个 asyncio 任务时,您不能只是“发射并忘记”它 - 您必须在某处保留对它的引用 - 并最终检查它是否完成,这样您就可以丢弃您的引用。

    正如您发现的(在评论中) - 需要将任务归因于局部变量 - 但与您的 IDE 报告不同,这还不够 - 就像仅仅归因于变量并丢弃它一样,您没有任何参考。

    使用一个容器,比如 set() ,并生成另一个任务来检查它们是否完成,也许使用一个 asyncio.wait 调用:

    import asyncio
    
    all_jobs = set()
    
    async def poll(self, interval: int):
        if self.is_config_applied:
            while True:
                # ...
                
                all_jobs.add(asyncio.create_task(make_logging_job(
                    params={
                        **self.polling_results
                    },
                )))
    
    asyncio def task_waiter():
        while True:
            pending, done = await asyncio.wait(all_jobs, timeout=10) # collect once each 10 seconds - change at your will
            all_jobs.clear()
            all_jobs.update(pending)
    
    
    # and wherever in your code, where you call `poll`, instead of
    # just `await poll` do:
    
    async def main(...):
        ...
        # await poll(...)  #old code, commented out
        poll_task = asyncio.create_task(poll(...))
        waiter_task = asyncio.create_task(task_waiter())
        await asyncio.gather(poll_task, waiter_task)
    
    
    
    
    
  • 我正在使用树莓派开发一个信息亭演示幻灯片程序,幻灯片在计算机显示器上运行良好,但当我在电视上尝试该程序时,它会在大约之后重置为第一个视频......

    我正在使用 Raspberry Pi 开发一个信息亭演示幻灯片程序,幻灯片在计算机显示器上可以正常播放,但当我在电视上尝试该程序时,它会在大约 65 秒后重置为第一个视频。我们有多台 Raspberry Pi,但都存在同样的问题。刷新率和分辨率设置为 1920 x 1080 和 60 赫兹。

    演示的总时长应为 3 分钟左右,但它总是在 65 秒时停止,只有在 Chromium 处于信息亭模式时才会在电视上显示。有一个程序在启动时运行,以信息亭模式启动 Chromium,设置正确的显示分辨率和刷新率,并通过 api 调用检索演示的 URL。

    如果您有任何建议,我将不胜感激!

    我尝试了多种 Chromium 设置,在浏览器中正常播放播放列表 URL,效果很好。我在没有使用 kiosk 模式的情况下运行它,结果与使用 kiosk 模式时一样,页面刷新。

返回
作者最近主题: