我正在实现一个类,该类使用对 Meta API 的请求来处理创建 Facebook 帖子。该过程分为两个阶段,分别针对图像和视频。遵循文档...
我正在实现一个类,该类使用对 Meta API 的请求来处理创建 Facebook 帖子。该过程分为两个阶段,分别针对图像和视频。按照文档,必须为每个图像或视频进行独立上传。随后,使用每个请求响应中返回的 ID,使用 attachment_media 和 {\'media_fbid\':\'image_id|video_id\'} 列表生成数据。当涉及到图像时,任务流程正常运行,没有任何问题。但是,当视频位于 attachment_media 列表中时,就会出现问题。有人知道以下错误的解决方案是什么吗?
{'error': {'message': '(#10) Application does not have permission for this action', 'type': 'OAuthException', 'code': 10, 'fbtrace_id': 'ApRVBhzB6YA2XsOYRSa6DJE'}}
这里的课程:
class FacebookHelper:
def __init__(self, publication: Publication, business: Business = None):
self.publication = publication
if not business:
self.business = self.publication.business
else:
self.business = business
self.update_business_access_token()
self.write_timeout = get_configuration_by_key("write_timeout")
def update_business_access_token(self):
if (
not self.business.page_access_token
or not self.business.session_access_token_date
or now() - self.business.session_access_token_date > timedelta(hours=2)
):
with httpx.Client(
base_url=get_configuration_by_key("meta_api_url")
) as client:
params = {
"fields": "access_token",
"access_token": self.business.related_meta_app.access_token,
}
try:
response = client.get(url=self.business.page_id, params=params)
data = response.json()
self.business.page_access_token = data["access_token"]
self.business.session_access_token_date = now()
self.business.save()
except Exception as e:
raise
def post_publication(self):
with httpx.Client(base_url=get_configuration_by_key("meta_api_url")) as client:
params = {
"access_token": self.business.page_access_token,
"published": True,
}
try:
response = client.post(
url=f"{self.business.page_id}/feed",
params=params,
data=self.generate_data(),
)
data = response.json()
print(data)
self.publication.post_id = data["id"]
self.publication.published = True
self.publication.publish_hour = None
self.publication.publish_date = None
self.publication.ready_to_publish = False
self.publication.published_date = now()
self.publication.last_publish_attempt_date = None
self.publication.last_publish_attempt_error = None
except Exception as e:
self.publication.last_publish_attempt_date = now()
self.publication.last_publish_attempt_error = f"{type(e)}"
self.publication.last_publish_attempt_error_description = str(e)
finally:
self.publication.publishing = False
self.publication.save()
def post_image(self, image: Image):
with httpx.Client(base_url=get_configuration_by_key("meta_api_url")) as client:
files = {
"file": image.image.file.read(),
}
data = {
"access_token": self.business.page_access_token,
"published": False,
}
try:
response = client.post(
url=f"{self.business.page_id}/photos", files=files, data=data
)
data = response.json()
print(data)
image.post_id = data["id"]
image.published = True
image.published_date = now()
image.last_publish_attempt_date = None
image.last_publish_attempt_error = None
except Exception as e:
image.last_publish_attempt_date = now()
image.last_publish_attempt_error = f"{type(e)}"
image.last_publish_attempt_error_description = str(e)
finally:
image.save()
def post_video(self, video: Video):
timeout = httpx.Timeout(60, write=float(self.write_timeout))
with httpx.Client(
base_url=get_configuration_by_key("meta_video_api_url")
) as client:
files = {
"source": video.video.file.open('rb'),
}
data = {
"access_token": self.business.page_access_token,
"published": False,
}
try:
response = client.post(
url=f"{self.business.page_id}/videos",
files=files,
data=data,
timeout=timeout,
)
data = response.json()
print(data)
video.post_id = data["id"]
video.published = True
video.published_date = now()
except Exception as e:
video.last_publish_attempt_date = now()
video.last_publish_attempt_error = f"{type(e)}"
video.last_publish_attempt_error_description = str(e)
finally:
video.save()
def generate_data(self):
data = {}
media = []
if self.publication.text:
data["message"] = self.publication.text
for image in self.publication.image_set.filter(published=True):
media.append({"media_fbid": image.post_id})
for video in self.publication.video_set.filter(published=True):
media.append({"media_fbid": video.post_id})
data["attached_media"] = json.dumps(media)
print(data)
return data
def multiple_post(self):
for image in self.publication.image_set.filter(published=False):
self.post_image(image)
for video in self.publication.video_set.filter(published=False):
self.post_video(video)
self.post_publication()