Python中的Youtube数据API nextPageToken循环

2024-10-17 08:26:06 发布

您现在位置:Python中文网/ 问答频道 /正文

我从网上找到的许多不同的例子中拼凑了出来

目标是:

  1. 在youtube api中搜索
  2. 将多个页面的搜索结果转换为csv文件

编辑:这是一个搜索循环的工作示例,感谢提供的一个答案。 现在,这将按预期循环最大次数(10次),但是,当执行时,现在的问题是CSV文件

似乎在调用响应之后,即使有对resultswriteCSV的调用,程序也会完成

任何进一步的帮助都将不胜感激

from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import argparse

DEVELOPER_KEY = "dev-key"
YOUTUBE_API_SERVICE_NAME = "youtube"
YOUTUBE_API_VERSION = "v3"

youtube = build(YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_VERSION, developerKey=DEVELOPER_KEY)


# -------------Build YouTube Search------------#
def youtubeSearch(query, order="relevance"):
    # search 50 results per page
    request = youtube.search().list(
        q=query,
        type="video",
        order=order,
        part="id,snippet",
        maxResults="50",
        relevanceLanguage='en',
        videoDuration='long',
        fields='nextPageToken, items(id,snippet)'
    )

    title = []
    channelId = []
    channelTitle = []
    categoryId = []
    videoId = []
    viewCount = []
    likeCount = []
    dislikeCount = []
    commentCount = []
    favoriteCount = []
    tags = []
    category = []
    videos = []

    while request:
        response = request.execute()
        for search_result in response.get("items", []):
            if search_result["id"]["kind"] == "youtube#video":

                # append title and video for each item
                title.append(search_result['snippet']['title'])
                videoId.append(search_result['id']['videoId'])

                # then collect stats on each video using videoId
                stats = youtube.videos().list(
                    part='statistics, snippet',
                    id=search_result['id']['videoId']).execute()

                channelId.append(stats['items'][0]['snippet']['channelId'])
                channelTitle.append(stats['items'][0]['snippet']['channelTitle'])
                categoryId.append(stats['items'][0]['snippet']['categoryId'])
                favoriteCount.append(stats['items'][0]['statistics']['favoriteCount'])
                viewCount.append(stats['items'][0]['statistics']['viewCount'])

                # Not every video has likes/dislikes enabled so they won't appear in JSON response
                try:
                    likeCount.append(stats['items'][0]['statistics']['likeCount'])
                except:
                    # Good to be aware of Channels that turn off their Likes
                    print("Video titled {0}, on Channel {1} Likes Count is not available".format(
                        stats['items'][0]['snippet']['title'],
                        stats['items'][0]['snippet']['channelTitle']))
                    print(stats['items'][0]['statistics'].keys())
                    # Appends "Not Available" to keep dictionary values aligned
                    likeCount.append("Not available")

                try:
                    dislikeCount.append(stats['items'][0]['statistics']['dislikeCount'])
                except:
                    # Good to be aware of Channels that turn off their Likes
                    print("Video titled {0}, on Channel {1} Dislikes Count is not available".format(
                        stats['items'][0]['snippet']['title'],
                        stats['items'][0]['snippet']['channelTitle']))
                    print(stats['items'][0]['statistics'].keys())
                    dislikeCount.append("Not available")

                # Sometimes comments are disabled so if they exist append, if not append nothing...
                # It's not uncommon to disable comments, so no need to wrap in try and except
                if 'commentCount' in stats['items'][0]['statistics'].keys():
                    commentCount.append(stats['items'][0]['statistics']['commentCount'])
                else:
                    commentCount.append(0)

                if 'tags' in stats['items'][0]['snippet'].keys():
                    tags.append(stats['items'][0]['snippet']['tags'])
                else:
                    # I'm not a fan of empty fields
                    tags.append("No Tags")
        request = youtube.search().list_next(
            request, response)
    # Break out of for-loop and if statement and store lists of values in dictionary
    youtube_dict = {'tags': tags, 'channelId': channelId, 'channelTitle': channelTitle,
                    'categoryId': categoryId, 'title': title, 'videoId': videoId,
                    'viewCount': viewCount, 'likeCount': likeCount, 'dislikeCount': dislikeCount,
                    'commentCount': commentCount, 'favoriteCount': favoriteCount}


    print("Search Completed...")
    print("Total results: {0} \nResults per page: {1}".format(request['pageInfo']['totalResults'],
                                                              request['pageInfo']['resultsPerPage']))
    print("Example output per item, snippet")
    print(request['items'][0]['snippet'].keys())
    # Assign first page of results (items) to item variable
    items = request['items']  # 50 "items"

    # Assign 1st results to title, channelId, datePublished then print
    title = items[0]['snippet']['title']
    channelId = items[0]['snippet']['channelId']
    datePublished = items[0]['snippet']['publishedAt']
    print("First result is: \n Title: {0} \n Channel ID: {1} \n Published on: {2}".format(title, channelId,
                                                                                          datePublished))
    return youtube_dict


# Input query
print("Please input your search query")
q = input()
# Run YouTube Search
results = youtubeSearch(q)
# Display result titles
print("Top 3 results are: \n {0}, ({1}), \n {2}, ({3}),\n {4}, ({5})".format(results['title'][0],
                                                                             results['channelTitle'][0],
                                                                             results['title'][1],
                                                                             results['channelTitle'][1],
                                                                             results['title'][2],
                                                                             results['channelTitle'][2]))

# -------------------------Save results------------------------------#
print("Input filename to store csv file")
file = "\\YouTube\\" + input() + ".csv"


def writeCSV(results, filename):
    import csv
    keys = sorted(results.keys())
    with open(filename, "w", newline="", encoding="utf-8") as output:
        writer = csv.writer(output, delimiter=",")
        writer.writerow(keys)
        writer.writerows(zip(*[results[key] for key in keys]))


writeCSV(results, file)
print("CSV file has been uploaded at: " + str(file))


Tags: tosearchtitleyoutuberequeststatsitemskeys
2条回答

经过几次不同的测试,我找到了一个解决方案。 我无法实现所建议的pythonic解决方案,但这对我很有效

import pandas as pd
import os
import webvtt
import csv

import google.oauth2.credentials
import google_auth_oauthlib.flow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google_auth_oauthlib.flow import InstalledAppFlow

CLIENT_SECRETS_FILE = "client_secrets.json"
SCOPES = ['https://www.googleapis.com/auth/youtube.force-ssl']
API_SERVICE_NAME = 'youtube'
API_VERSION = 'v3'

def get_authenticated_service():
    flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRETS_FILE, SCOPES)
    credentials = flow.run_console()
    return build(API_SERVICE_NAME, API_VERSION, credentials= credentials)

# Remove keyword arguments that are not set
def remove_empty_kwargs(**kwargs):
    good_kwargs = {}
    if kwargs is not None:
        for key, value in kwargs.items():
            if value:
                good_kwargs[key] = value
    return good_kwargs

client = get_authenticated_service()

def youtube_keyword(client, **kwargs):
    kwargs = remove_empty_kwargs(**kwargs)
    response = client.search().list(
        **kwargs
        ).execute()
    return response

def youtube_search(criteria, max_res):
    # create lists and empty dataframe
    titles = []
    videoIds = []
    channelIds = []
    resp_df = pd.DataFrame()

    while len(titles) < max_res:
        token = None
        response = youtube_keyword(client,
                                   part='id,snippet',
                                   maxResults=50,
                                   q=criteria,
                                   videoCaption='closedCaption',
                                   type='video',
                                   videoDuration='long',
                                   pageToken=token)

        for item in response['items']:
            titles.append(item['snippet']['title'])
            channelIds.append(item['snippet']['channelTitle'])
            videoIds.append(item['id']['videoId'])

        token = response["nextPageToken"]

    resp_df['title'] = titles
    resp_df['channelId'] = channelIds
    resp_df['videoId'] = videoIds
    resp_df['subject'] = criteria

    return resp_df

Found_Videos = youtube_search('[search criteria]',1000)
Found_Videos.shape

Found_Videos.head()
Found_Videos.to_csv('Found_Videos.csv')

由于您正在使用Google的APIs Client Library for Python,因此在^{}API端点上实现result set paginationpythonic way如下所示:

request = youtube.search().list(
    q = 'A query',
    part = 'id,snippet',
    type = 'video',
    maxResults = 50,
    relevanceLanguage = 'en',
    videoDuration = 'long'
)

while request:
    response = request.execute()

    for item in response['items']:
        ...

    request = youtube.search().list_next(
        request, response)

由于Python客户机库的实现方式,它是如此简单:根本不需要显式处理API响应对象的属性^{}和API请求参数pageToken

相关问题 更多 >