total_files = len(file_list)//1000 #let file list be 2000, total files will be 2.
#divide the files into sets of 1000 and loop over them one by one
for file_set in range(1,(total_files+1)):
auth_and_trigger(file_list[(file_set-1)*1000:(file_set*1000)])
#for files left after 1000*i , we finally trigger it.
auth_and_trigger(file_list[(total_files)*1000:len(file_list)])
现在,您可以为每1000个文件调用云运行auth和trigger函数
def auth_and_trigger(self, rest_of_files):
#your cloud run url
receiving_service_url = 'https://cloudrun-url-uc.a.run.app/download'
# Set up metadata server request
# See https://cloud.google.com/compute/docs/instances/verifying-instance-identity#request_signature
metadata_server_token_url = 'http://metadata/computeMetadata/v1/instance/service-accounts/default/identity?audience='
token_request_url = metadata_server_token_url + receiving_service_url
token_request_headers = {'Metadata-Flavor': 'Google'}
# Fetch the token
token_response = requests.get(token_request_url, headers=token_request_headers)
jwt = token_response.content.decode("utf-8")
# Provide the token in the request to the receiving service
receiving_service_headers = {'Authorization': f'bearer {jwt}'}
try:
threading.Thread(target=self.trigger_ingest,
args=(receiving_service_url,
{"files": rest_of_files},
receiving_service_headers
)).start()
except Exception as error:
logging.error(error)
嗯,我做过类似的事情。我不确定它是否对您有帮助,因为您没有共享一块代码。下面是我要下载的2k JSON文件的示例代码
您有2000个文件,其中1200个在云运行超时之前在GBQ中处理/加载。您可以做的是:
现在,您可以为每1000个文件调用云运行auth和trigger函数
每个线程将调用一个函数trigger_inset,该函数将调用云运行。其代码如下:
现在,由于您希望并行执行,请确保线程中没有重复的代码,因为它位于云运行的触发器中
相关问题 更多 >
编程相关推荐