将约20个表从BigQuery传输到MongoDB(通过Python)时出现“chunkedEncodingError,连接断开:OSError 104”

2024-04-25 19:49:20 发布

您现在位置:Python中文网/ 问答频道 /正文

作为问题的前言,我不确定这是否与以下问题有关:

(a)气流及其部署/运行的GCP计算引擎

(b)Python与BigQuery的连接(使用BigQuery API client library for python

(c)Python与MongoDB的连接(使用pymongo/MongoClient)

。。。因此,准确地标注问题是很困难的。简而言之,我们有一个包含20个任务的AirflowDAG,其中每个任务都将单个BigQuery表中的数据查询到pandas数据框中,然后快速清理数据,然后将数据插入MongoDB集合。每个任务都为不同于BigQuery的table_name运行相同的函数:

def transfer_all_by_partitioned_key(table_name, partition_key):
    
    start_time = time.time()
    print(partition_key)

    # (1) Connect to BigQuery + Mongo DB
    bq = bigquery.Client()
    cluster = MongoClient(MONGO_URI)
    db = cluster["mydb"]
    print(f'====== (1) Connected to BQ + Mongo: {round(time.time() - start_time, 5)}')
    
    # (2) Create list of options, in order to transfer data in chunks
    get_partitions_query = f"select distinct {partition_key} from myprojectid.mydataset.{table_name}"
    partitions_df = bq.query(get_partitions_query).to_dataframe()
    
    # (3) Loop DF, query + write to Mongo for each partition
    db[table_name].drop() # drop table, as we are recreating it entirely here
    for index, row in partitions_df.iterrows():
        
        # Run Query
        iter_value = row[partition_key]

        iter_query = f"select * from `myprojectid.mydataset.{table_name}` where {partition_key} = {iter_value}"
        iter_df = bq.query(iter_query).to_dataframe()
        print(f'====== {index}: ({iter_value}) {iter_df.shape[0]} rows updated on this date')


        # Handle Dates
        row1_types = iter_df.iloc[0].apply(type) # missing data in 1st row may be issue
        date_cols = [key for key in dict(row1_types) if row1_types[key] == datetime.date]
        for col in date_cols:
            iter_df[[col]] = iter_df[[col]].astype(str) # .where(iter_df[[col]].notnull(), None)

        # Handle DateTimes
        datetime_cols = [key for key in dict(iter_df.dtypes) if is_datetime(iter_df[key])]
        for col in datetime_cols:
            iter_df[[col]] = iter_df[[col]].astype(object) # .where(iter_df[[col]].notnull(), None)
        
        # Insert into MongoDB
        db[table_name].insert_many(iter_df.to_dict('records'))

由于分块/循环(以避免气流中的内存问题),每个任务最多可以从BigQuery+向MongoDB写入500次查询。至于这个问题——似乎是随机的,对于这些任务,我不时会收到以下错误:

requests.exceptions.ChunkedEncodingError: ('Connection broken: OSError("(104, \'ECONNRESET\')")', OSError("(104, 'ECONNRESET')"))

在20个任务中,有3个在不同的时间发生了这种情况,似乎任何任务在任何时间都容易发生这种错误。。。然而,当任务被清除并在气流中重新运行时,它通常第二次工作(不太可重复)

可能是我从BigQuery查询,或者太频繁地插入MongoDB

以下是气流日志中的完整错误:

[2020-07-29 22:39:20,735] {{taskinstance.py:1145}} ERROR - ('Connection broken: OSError("(104, \'ECONNRESET\')")', OSError("(104, 'ECONNRESET')"))
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py", line 313, in recv_into
    return self.connection.recv_into(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/OpenSSL/SSL.py", line 1840, in recv_into
    self._raise_ssl_error(self._ssl, result)
  File "/usr/local/lib/python3.7/site-packages/OpenSSL/SSL.py", line 1663, in _raise_ssl_error
    raise SysCallError(errno, errorcode.get(errno))
OpenSSL.SSL.SysCallError: (104, 'ECONNRESET')

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 437, in _error_catcher
    yield
  File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 764, in read_chunked
    self._update_chunk_length()
  File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 694, in _update_chunk_length
    line = self._fp.fp.readline()
  File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
    return self._sock.recv_into(b)
  File "/usr/local/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py", line 318, in recv_into
    raise SocketError(str(e))
OSError: (104, 'ECONNRESET')

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 751, in generate
    for chunk in self.raw.stream(chunk_size, decode_content=True):
  File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 572, in stream
    for line in self.read_chunked(amt, decode_content=decode_content):
  File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 793, in read_chunked
    self._original_response.close()
  File "/usr/local/lib/python3.7/contextlib.py", line 130, in __exit__
    self.gen.throw(type, value, traceback)
  File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 455, in _error_catcher
    raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ('Connection broken: OSError("(104, \'ECONNRESET\')")', OSError("(104, 'ECONNRESET')"))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 983, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 113, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/python_operator.py", line 118, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/usr/local/airflow/plugins/tasks/geniussports/transfers.py", line 223, in transfer_all_by_partitioned_key
    iter_df = bq.query(iter_query).to_dataframe()
  File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/job.py", line 3374, in to_dataframe
    create_bqstorage_client=create_bqstorage_client,
  File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/table.py", line 1741, in to_dataframe
    for frame in self.to_dataframe_iterable(dtypes=dtypes):
  File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/table.py", line 1435, in _to_page_iterable
    for item in tabledata_list_download():
  File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/_pandas_helpers.py", line 563, in download_dataframe_tabledata_list
    for page in pages:
  File "/usr/local/lib/python3.7/site-packages/google/api_core/page_iterator.py", line 249, in _page_iter
    page = self._next_page()
  File "/usr/local/lib/python3.7/site-packages/google/api_core/page_iterator.py", line 369, in _next_page
    response = self._get_next_page_response()
  File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/table.py", line 1367, in _get_next_page_response
    method=self._HTTP_METHOD, path=self.path, query_params=params
  File "/usr/local/lib/python3.7/site-packages/google/cloud/bigquery/client.py", line 556, in _call_api
    return call()
  File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 286, in retry_wrapped_func
    on_error=on_error,
  File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 184, in retry_target
    return target()
  File "/usr/local/lib/python3.7/site-packages/google/cloud/_http.py", line 419, in api_request
    timeout=timeout,
  File "/usr/local/lib/python3.7/site-packages/google/cloud/_http.py", line 277, in _make_request
    method, url, headers, data, target_object, timeout=timeout
  File "/usr/local/lib/python3.7/site-packages/google/cloud/_http.py", line 315, in _do_request
    url=url, method=method, headers=headers, data=data, timeout=timeout
  File "/usr/local/lib/python3.7/site-packages/google/auth/transport/requests.py", line 450, in request
    **kwargs
  File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 530, in request
    resp = self.send(prep, **send_kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 683, in send
    r.content
  File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 829, in content
    self._content = b''.join(self.iter_content(CONTENT_CHUNK_SIZE)) or b''
  File "/usr/local/lib/python3.7/site-packages/requests/models.py", line 754, in generate
    raise ChunkedEncodingError(e)
requests.exceptions.ChunkedEncodingError: ('Connection broken: OSError("(104, \'ECONNRESET\')")', OSError("(104, 'ECONNRESET')"))

Tags: keyinpyselfdfforlibpackages