使用 YouTube 直播 API 的实时视频 [Python]

0 投票
1 回答
84 浏览
提问于 2025-04-14 15:38

我正在尝试创建一个Python脚本,利用YouTube直播API和OAuth来进行直播。在代码的第一部分,我完成了通过OAuth的身份验证,这部分运行得很好。

接下来,我从我在YouTube上的公开视频创建一个直播流,并启动直播。然后,我把这个流绑定到直播上,并开始直播。

运行脚本后,它打印出“直播已成功开始”的消息。

但是,当我去YouTube查看时,虽然直播已经创建,但里面什么都没有播放。它仍然显示正在等待直播的消息。

我完全搞不懂,因为我还是个初学者。你能给我一些建议,帮我解决这个问题吗?

import os
import pickle
import time
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
from datetime import datetime, timedelta

# Function for OAuth authentication
def authenticate():
    SCOPES = ['https://www.googleapis.com/auth/youtube.force-ssl', 
              'https://www.googleapis.com/auth/youtube', 
              'https://www.googleapis.com/auth/youtube.readonly', 
              'https://www.googleapis.com/auth/youtubepartner', 
              'https://www.googleapis.com/auth/youtube.upload']

    credentials_file = 'token.pickle'

    if os.path.exists(credentials_file):
        with open(credentials_file, 'rb') as token:
            credentials = pickle.load(token)
    else:
        flow = InstalledAppFlow.from_client_secrets_file('client_secrets.json', scopes=SCOPES)
        credentials = flow.run_local_server(port=8080)

        with open(credentials_file, 'wb') as token:
            pickle.dump(credentials, token)

    return credentials

# Create an authorized YouTube API client
def create_youtube_client(credentials):
    return build('youtube', 'v3', credentials=credentials)

# Get the video ID from your channel
def get_video_id():
    # Here you can implement code to get the video ID, for example using the YouTube Data API
    return "my video ID from https"

# Create a live stream
def create_live_stream(youtube):
    request = youtube.liveStreams().insert(
        part="snippet,cdn,contentDetails,status",
        body={
            "snippet": {
                "title": "Your new video stream's name",
                "description": "A description of your video stream. This field is optional."
            },
            "cdn": {
                "frameRate": "variable",
                "ingestionType": "rtmp",
                "resolution": "variable"
            },
            "contentDetails": {
                "enableAutoStart": True,
                "isReusable": True
            }
        }
    )
    response = request.execute()
    return response["id"]

# Create a live broadcast
def create_live_broadcast(youtube, video_id, stream_id):
    request = youtube.liveBroadcasts().insert(
        part="snippet,status,contentDetails",
        body={
            "snippet": {
                "title": "Your live broadcast title",
                "description": "Your live broadcast description",
                "scheduledStartTime": (datetime.now() + timedelta(minutes=1)).isoformat(),  # Schedule the broadcast for 1 minutes later
                "thumbnails": {
                    "default": {
                        "url": "https://example.com/live-thumbnail.jpg"
                    }
                }
            },
            "status": {
                "privacyStatus": "private"
            },
            "contentDetails": {
                "enableAutoStart": True
            }
        }
    )
    response = request.execute()
    broadcast_id = response["id"]

    # Bind the stream to the live broadcast
    bind_request = youtube.liveBroadcasts().bind(
        part="id,snippet",
        id=broadcast_id,
        streamId=stream_id
    )
    bind_response = bind_request.execute()

    return broadcast_id

# Start the live broadcast
def start_live_broadcast(youtube, broadcast_id):
    request = youtube.liveBroadcasts().update(
        part="id,status",
        body={
            "id": broadcast_id,
            "status": {
                "lifeCycleStatus": "live"
            }
        }
    )
    response = request.execute()
    return response["id"]

# Main function
def main():
    # Authenticate using OAuth
    credentials = authenticate()

    # Create an authorized client for the YouTube API
    youtube = create_youtube_client(credentials)

    # Get the video ID
    video_id = get_video_id()

    # Create a live stream
    stream_id = create_live_stream(youtube)

    # Create a live broadcast
    broadcast_id = create_live_broadcast(youtube, video_id, stream_id)

    # Wait for 10 seconds to initialize the broadcast
    time.sleep(10)

    # Start the live broadcast
    start_live_broadcast(youtube, broadcast_id)

    print("Live broadcast has been successfully started.")

if __name__ == "__main__":
    main()

我尝试了不同的视频ID,也尝试用YouTube API的“videos.insert”上传新视频,但似乎都没有效果。肯定有我这个新手看不出来的错误,但一个开发者应该能很快发现。

看起来直播没有接收到视频源进行播放,但我不知道为什么。

1 个回答

0

这个问题是因为在 start_live_broadcast 函数中,用错了参数来更新直播状态为“直播中”。正确的参数应该是 status.lifeCycleStatus,而不是直接用 status。

def start_live_broadcast(youtube, broadcast_id):
request = youtube.liveBroadcasts().transition(
    broadcastStatus="live",
    id=broadcast_id,
    part="id,status"
)
response = request.execute()
return response["id"]

撰写回答