作为问题的前言,我不确定这是否与以下问题有关:
(a)气流及其部署/运行的GCP计算引擎
(b)Python与BigQuery的连接(使用BigQuery API client library for python)
(c)Python与MongoDB的连接(使用pymongo/MongoClient)
。。。因此,准确地标注问题是很困难的。简而言之,我们有一个包含20个任务的AirflowDAG
,其中每个任务都将单个BigQuery表中的数据查询到pandas数据框中,然后快速清理数据,然后将数据插入MongoDB集合。每个任务都为不同于BigQuery的table_name
运行相同的函数:
def transfer_all_by_partitioned_key(table_name, partition_key):
start_time = time.time()
print(partition_key)
# (1) Connect to BigQuery + Mongo DB
bq = bigquery.Client()
cluster = MongoClient(MONGO_URI)
db = cluster["mydb"]
print(f'====== (1) Connected to BQ + Mongo: {round(time.time() - start_time, 5)}')
# (2) Create list of options, in order to transfer data in chunks
get_partitions_query = f"select distinct {partition_key} from myprojectid.mydataset.{table_name}"
partitions_df = bq.query(get_partitions_query).to_dataframe()
# (3) Loop DF, query + write to Mongo for each partition
db[table_name].drop() # drop table, as we are recreating it entirely here
for index, row in partitions_df.iterrows():
# Run Query
iter_value = row[partition_key]
iter_query = f"select * from `myprojectid.mydataset.{table_name}` where {partition_key} = {iter_value}"
iter_df = bq.query(iter_query).to_dataframe()
print(f'====== {index}: ({iter_value}) {iter_df.shape[0]} rows updated on this date')
# Handle Dates
row1_types = iter_df.iloc[0].apply(type) # missing data in 1st row may be issue
date_cols = [key for key in dict(row1_types) if row1_types[key] == datetime.date]
for col in date_cols:
iter_df[[col]] = iter_df[[col]].astype(str) # .where(iter_df[[col]].notnull(), None)
# Handle DateTimes
datetime_cols = [key for key in dict(iter_df.dtypes) if is_datetime(iter_df[key])]
for col in datetime_cols:
iter_df[[col]] = iter_df[[col]].astype(object) # .where(iter_df[[col]].notnull(), None)
# Insert into MongoDB
db[table_name].insert_many(iter_df.to_dict('records'))
由于分块/循环(以避免气流中的内存问题),每个任务最多可以从BigQuery+向MongoDB写入500次查询。至于这个问题——似乎是随机的,对于这些任务,我不时会收到以下错误:
requests.exceptions.ChunkedEncodingError: ('Connection broken: OSError("(104, \'ECONNRESET\')")', OSError("(104, 'ECONNRESET')"))
在20个任务中,有3个在不同的时间发生了这种情况,似乎任何任务在任何时间都容易发生这种错误。。。然而,当任务被清除并在气流中重新运行时,它通常第二次工作(不太可重复)
可能是我从BigQuery查询,或者太频繁地插入MongoDB
以下是气流日志中的完整错误:
[2020-07-29 22:39:20,735] {{taskinstance.py:1145}} ERROR - ('Connection broken: OSError("(104, \'ECONNRESET\')")', OSError("(104, 'ECONNRESET')"))
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py", line 313, in recv_into
return self.connection.recv_into(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/OpenSSL/SSL.py", line 1840, in recv_into
self._raise_ssl_error(self._ssl, result)
File "/usr/local/lib/python3.7/site-packages/OpenSSL/SSL.py", line 1663, in _raise_ssl_error
raise SysCallError(errno, errorcode.get(errno))
OpenSSL.SSL.SysCallError: (104, 'ECONNRESET')
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 437, in _error_catcher
yield
File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 764, in read_chunked
self._update_chunk_length()
File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 694, in _update_chunk_length
line = self._fp.fp.readline()
File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
return self._sock.recv_into(b)
File "/usr/local/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py", line 318, in recv_into
raise SocketError(str(e))
OSError: (104, 'ECONNRESET')
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 751, in generate
for chunk in self.raw.stream(chunk_size, decode_content=True):
File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 572, in stream
for line in self.read_chunked(amt, decode_content=decode_content):
File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 793, in read_chunked
self._original_response.close()
File "/usr/local/lib/python3.7/contextlib.py", line 130, in __exit__
self.gen.throw(type, value, traceback)
File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 455, in _error_catcher
raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ('Connection broken: OSError("(104, \'ECONNRESET\')")', OSError("(104, 'ECONNRESET')"))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 113, in execute
return_value = self.execute_callable()
File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 118, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/usr/local/airflow/plugins/tasks/geniussports/transfers.py", line 223, in transfer_all_by_partitioned_key
iter_df = bq.query(iter_query).to_dataframe()
File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/job.py", line 3374, in to_dataframe
create_bqstorage_client=create_bqstorage_client,
File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/table.py", line 1741, in to_dataframe
for frame in self.to_dataframe_iterable(dtypes=dtypes):
File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/table.py", line 1435, in _to_page_iterable
for item in tabledata_list_download():
File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/_pandas_helpers.py", line 563, in download_dataframe_tabledata_list
for page in pages:
File "/usr/local/lib/python3.7/site-packages/google/api_core/page_iterator.py", line 249, in _page_iter
page = self._next_page()
File "/usr/local/lib/python3.7/site-packages/google/api_core/page_iterator.py", line 369, in _next_page
response = self._get_next_page_response()
File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/table.py", line 1367, in _get_next_page_response
method=self._HTTP_METHOD, path=self.path, query_params=params
File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 556, in _call_api
return call()
File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 286, in retry_wrapped_func
on_error=on_error,
File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 184, in retry_target
return target()
File "/usr/local/lib/python3.7/site-packages/google/cloud/_http.py", line 419, in api_request
timeout=timeout,
File "/usr/local/lib/python3.7/site-packages/google/cloud/_http.py", line 277, in _make_request
method, url, headers, data, target_object, timeout=timeout
File "/usr/local/lib/python3.7/site-packages/google/cloud/_http.py", line 315, in _do_request
url=url, method=method, headers=headers, data=data, timeout=timeout
File "/usr/local/lib/python3.7/site-packages/google/auth/transport/requests.py", line 450, in request
**kwargs
File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 530, in request
resp = self.send(prep, **send_kwargs)
File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 683, in send
r.content
File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 829, in content
self._content = b''.join(self.iter_content(CONTENT_CHUNK_SIZE)) or b''
File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 754, in generate
raise ChunkedEncodingError(e)
requests.exceptions.ChunkedEncodingError: ('Connection broken: OSError("(104, \'ECONNRESET\')")', OSError("(104, 'ECONNRESET')"))
目前没有回答
相关问题 更多 >
编程相关推荐