如何从云运行调用同一个云运行来并行运行请求?

2024-04-20 00:23:22 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在使用cloud run运行一个ETL进程

我有2000个文件。只有1200个文件被预处理并加载到大查询中。因为云运行已超时。所以,我想分一下负荷

我将2000个文件分成4组,每个文件为500个,并验证并使用requests.post调用同一个云运行。但是,它使用相同的cloud run实例一个接一个地执行一组。它又超时了

如何使其并行运行

截至目前,最大实例数:20。并发:1,CPU:2,内存:8GB


Tags: 文件实例内存runcloud进程etlcpu
1条回答
网友
1楼 · 发布于 2024-04-20 00:23:22

嗯,我做过类似的事情。我不确定它是否对您有帮助,因为您没有共享一块代码。下面是我要下载的2k JSON文件的示例代码

您有2000个文件,其中1200个在云运行超时之前在GBQ中处理/加载。您可以做的是:

    total_files = len(file_list)//1000   #let file list be 2000, total files will be 2.

    #divide the files into sets of 1000 and loop over them one by one
    
    for file_set in range(1,(total_files+1)):
        auth_and_trigger(file_list[(file_set-1)*1000:(file_set*1000)])

    #for files left after 1000*i , we finally trigger it.
    auth_and_trigger(file_list[(total_files)*1000:len(file_list)])

现在,您可以为每1000个文件调用云运行auth和trigger函数

    def auth_and_trigger(self, rest_of_files):
    #your cloud run url
    receiving_service_url = 'https://cloudrun-url-uc.a.run.app/download'

    # Set up metadata server request
    # See https://cloud.google.com/compute/docs/instances/verifying-instance-identity#request_signature
    metadata_server_token_url = 'http://metadata/computeMetadata/v1/instance/service-accounts/default/identity?audience='

    token_request_url = metadata_server_token_url + receiving_service_url
    token_request_headers = {'Metadata-Flavor': 'Google'}

    # Fetch the token
    token_response = requests.get(token_request_url, headers=token_request_headers)
    jwt = token_response.content.decode("utf-8")

    # Provide the token in the request to the receiving service
    receiving_service_headers = {'Authorization': f'bearer {jwt}'}

    try:
        threading.Thread(target=self.trigger_ingest,
                         args=(receiving_service_url,
                               {"files": rest_of_files},
                               receiving_service_headers
                               )).start()
    except Exception as error:
        logging.error(error)

每个线程将调用一个函数trigger_inset,该函数将调用云运行。其代码如下:

    def trigger_ingest(url, json, headers=""):
    service_response = requests.post(url=url,
                                     json=json,
                                     headers=headers
                                     )
    logging.info(service_response.content)

现在,由于您希望并行执行,请确保线程中没有重复的代码,因为它位于云运行的触发器中

相关问题 更多 >