使用 YouTube 直播 API 的实时视频 [Python]
我正在尝试创建一个Python脚本,利用YouTube直播API和OAuth来进行直播。在代码的第一部分,我完成了通过OAuth的身份验证,这部分运行得很好。
接下来,我从我在YouTube上的公开视频创建一个直播流,并启动直播。然后,我把这个流绑定到直播上,并开始直播。
运行脚本后,它打印出“直播已成功开始”的消息。
但是,当我去YouTube查看时,虽然直播已经创建,但里面什么都没有播放。它仍然显示正在等待直播的消息。
我完全搞不懂,因为我还是个初学者。你能给我一些建议,帮我解决这个问题吗?
import os
import pickle
import time
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
from datetime import datetime, timedelta
# Function for OAuth authentication
def authenticate():
SCOPES = ['https://www.googleapis.com/auth/youtube.force-ssl',
'https://www.googleapis.com/auth/youtube',
'https://www.googleapis.com/auth/youtube.readonly',
'https://www.googleapis.com/auth/youtubepartner',
'https://www.googleapis.com/auth/youtube.upload']
credentials_file = 'token.pickle'
if os.path.exists(credentials_file):
with open(credentials_file, 'rb') as token:
credentials = pickle.load(token)
else:
flow = InstalledAppFlow.from_client_secrets_file('client_secrets.json', scopes=SCOPES)
credentials = flow.run_local_server(port=8080)
with open(credentials_file, 'wb') as token:
pickle.dump(credentials, token)
return credentials
# Create an authorized YouTube API client
def create_youtube_client(credentials):
return build('youtube', 'v3', credentials=credentials)
# Get the video ID from your channel
def get_video_id():
# Here you can implement code to get the video ID, for example using the YouTube Data API
return "my video ID from https"
# Create a live stream
def create_live_stream(youtube):
request = youtube.liveStreams().insert(
part="snippet,cdn,contentDetails,status",
body={
"snippet": {
"title": "Your new video stream's name",
"description": "A description of your video stream. This field is optional."
},
"cdn": {
"frameRate": "variable",
"ingestionType": "rtmp",
"resolution": "variable"
},
"contentDetails": {
"enableAutoStart": True,
"isReusable": True
}
}
)
response = request.execute()
return response["id"]
# Create a live broadcast
def create_live_broadcast(youtube, video_id, stream_id):
request = youtube.liveBroadcasts().insert(
part="snippet,status,contentDetails",
body={
"snippet": {
"title": "Your live broadcast title",
"description": "Your live broadcast description",
"scheduledStartTime": (datetime.now() + timedelta(minutes=1)).isoformat(), # Schedule the broadcast for 1 minutes later
"thumbnails": {
"default": {
"url": "https://example.com/live-thumbnail.jpg"
}
}
},
"status": {
"privacyStatus": "private"
},
"contentDetails": {
"enableAutoStart": True
}
}
)
response = request.execute()
broadcast_id = response["id"]
# Bind the stream to the live broadcast
bind_request = youtube.liveBroadcasts().bind(
part="id,snippet",
id=broadcast_id,
streamId=stream_id
)
bind_response = bind_request.execute()
return broadcast_id
# Start the live broadcast
def start_live_broadcast(youtube, broadcast_id):
request = youtube.liveBroadcasts().update(
part="id,status",
body={
"id": broadcast_id,
"status": {
"lifeCycleStatus": "live"
}
}
)
response = request.execute()
return response["id"]
# Main function
def main():
# Authenticate using OAuth
credentials = authenticate()
# Create an authorized client for the YouTube API
youtube = create_youtube_client(credentials)
# Get the video ID
video_id = get_video_id()
# Create a live stream
stream_id = create_live_stream(youtube)
# Create a live broadcast
broadcast_id = create_live_broadcast(youtube, video_id, stream_id)
# Wait for 10 seconds to initialize the broadcast
time.sleep(10)
# Start the live broadcast
start_live_broadcast(youtube, broadcast_id)
print("Live broadcast has been successfully started.")
if __name__ == "__main__":
main()
我尝试了不同的视频ID,也尝试用YouTube API的“videos.insert”上传新视频,但似乎都没有效果。肯定有我这个新手看不出来的错误,但一个开发者应该能很快发现。
看起来直播没有接收到视频源进行播放,但我不知道为什么。
1 个回答
0
这个问题是因为在 start_live_broadcast 函数中,用错了参数来更新直播状态为“直播中”。正确的参数应该是 status.lifeCycleStatus,而不是直接用 status。
def start_live_broadcast(youtube, broadcast_id):
request = youtube.liveBroadcasts().transition(
broadcastStatus="live",
id=broadcast_id,
part="id,status"
)
response = request.execute()
return response["id"]