我正在使用 Python 和 PyCharm,我试图使用 asyncio 卸载数据库写入,但在 IDE 警告方面遇到了麻烦。在我的 main.py 中,我以这种方式启动任务db_logging_monitoring_task =
我正在使用 Python 和 PyCharm,我试图卸载数据库写入, asyncio
但遇到了 IDE 警告问题
在我的 main.py 中,我以这种方式启动任务
db_logging_monitoring_task = asyncio.create_task(start_logging_jobs_sniffer())
tasks.append(db_logging_monitoring_task)
await gather(*tasks)
异步函数内部的 while 循环在 start_logging_jobs_sniffer
哪里
# Waits on jobs from a jobs_queue
async def start_logging_jobs_sniffer(self):
while True:
try:
job = await self.logging_jobs_queue.get()
# Adds jobs to a jobs_queue
async def make_logging_job(self, params):
job = self.LoggingJob(params)
await self.logging_jobs_queue.put(job)
类似地,在 main.py 中,我有另一个后台监控任务,该任务以 5 秒为间隔进行轮询,然后记录轮询结果
async def poll(self, interval: int):
if self.is_config_applied:
while True:
# ...
asyncio.create_task(make_logging_job(
params={
**self.polling_results
},
))
但 pycharm 向我发出警告
Coroutine 'create_task' is not awaited
我特别希望 make_logging_job
任务能够与其他任务并行工作,这就是我不使用的原因 await
,而且这似乎按预期工作。
如果我在出现警告的情况下运行程序,则它们 make_logging_job
似乎是并行的,并且不会阻止任何内容
我完全不明白为什么 PyCharm 会发出未等待的警告 create_task
,还有其他更好的方法来完成任务吗?
将任务保存到变量可以抑制错误,并且似乎通过在后台运行来发挥作用
test = asyncio.create_task(make_logging_job(
params={
**self.polling_results
},
))
但我不知道这有什么用,我认为将任务保存到变量不会等待协程
总体而言:我尝试使用 Cloudbuild 和 Cloudrun 构建 BERT 模型。我将模型(参数)和元数据(标签)保存在 GCP Cloud Storage 中。但是,我遇到了加载元数据.bin 的错误...
总体而言: 我尝试使用 Cloudbuild 和 Cloudrun 。我将模型(参数)和元数据(标签)保存在 GCP 云存储中。但是,我在通过 joblib.load() 加载 metadata.bin 文件时遇到了错误。 我的 metadata.bin 文件包含 UTF-8 字符,但 joblib.load 需要 ASCII 字符。在我的版本中,默认协议是 4,但错误消息表明协议是 0。
相关依赖项: python 3.8.0、joblib 1.1.1(我已经尝试升级最近的版本)、google-api-core==2.19.1、google-auth==2.32.0、google-cloud-core==2.4.1、google-cloud-storage==2.18.0
我的努力: 我已经审理了两起案件。
错误详情 :
`
File "./src_review/model_server.py", line 70, in load_bert_model
metadata = joblib.load(metadata_path)
File "/usr/local/lib/python3.8/site-packages/joblib/numpy_pickle.py", line 658, in load
obj = _unpickle(fobj, filename, mmap_mode)
File "/usr/local/lib/python3.8/site-packages/joblib/numpy_pickle.py", line 577, in _unpickle
obj = unpickler.load()
File "/usr/local/lib/python3.8/pickle.py", line 1210, in load
dispatch[key[0]](self)
File "/usr/local/lib/python3.8/pickle.py", line 1244, in load_persid
raise UnpicklingError(
_pickle.UnpicklingError: persistent IDs in protocol 0 must be ASCII strings`
我的代码: `
def load_bert_model(config: argparse.Namespace):
bucket = storage_client.bucket(bucket_name)
model_blob = bucket.blob(model_file)
metadata_blob = bucket.blob(metadata_file)
local_model_path = '/tmp/pytorch_model.bin'
metadata_path = '/tmp/meta.bin'
print(f"Downloading model to {local_model_path}")
model_blob.download_to_filename(local_model_path)
log.info(f"Model downloaded to {local_model_path}")
metadata_blob.download_to_filename(metadata_path)
log.info(f"Metadata (label) downloaded to {metadata_path}")
metadata = joblib.load(metadata_path)
...`
`
def upload_directory_with_transfer_manager(bucket_name, source_directory, workers=1):
bucket = create_bucket_if_not_exists(bucket_name)
directory_as_path_obj = Path(source_directory)
paths = directory_as_path_obj.rglob("*.bin")
file_paths = [path for path in paths if path.is_file()]
relative_paths = [path.relative_to(source_directory) for path in file_paths]
string_paths = [str(path) for path in relative_paths]
print("Found {} files.".format(len(string_paths)))
results = transfer_manager.upload_many_from_filenames(
bucket, string_paths, source_directory=source_directory, max_workers=workers, skip_if_exists=False
)
for name, result in zip(string_paths, results):
if isinstance(result, Exception):
print("Failed to upload {} due to exception: {}".format(name, result))
else:
print("Uploaded {} to {}.".format(name, bucket.name))
`
预期原因: 我认为 cloud run config 与我的测试环境有很大不同。但我无法预期主要原因。
谢谢你的努力!