botocore.errorfactory.ProvisionedThroughputExceedeDexException异常运行spark作业时从DynamoDB的boto3

2024-04-26 08:02:03 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在从卡夫卡流中获取数据,并将它们保存到DynamoDB中。执行此操作时,出现以下错误:

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1577444134805_0063/container_1577444134805_0063_01_000004/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/mnt/yarn/usercache/hadoop/appcache/application_1577444134805_0063/container_1577444134805_0063_01_000004/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 1007, in func_with_open_process_close
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 1000, in func_with_open_process_close
  File "./pyemr.zip/pyemr/dynamowriter.py", line 117, in process
    Item=event
  File "/usr/local/lib/python3.6/site-packages/boto3/dynamodb/table.py", line 101, in put_item
    self._add_request_and_process({'PutRequest': {'Item': Item}})
  File "/usr/local/lib/python3.6/site-packages/boto3/dynamodb/table.py", line 110, in _add_request_and_process
    self._flush_if_needed()
  File "/usr/local/lib/python3.6/site-packages/boto3/dynamodb/table.py", line 131, in _flush_if_needed
    self._flush()
  File "/usr/local/lib/python3.6/site-packages/boto3/dynamodb/table.py", line 137, in _flush
    RequestItems={self._table_name: items_to_send})
  File "/usr/local/lib/python3.6/site-packages/botocore/client.py", line 276, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/usr/local/lib/python3.6/site-packages/botocore/client.py", line 586, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.errorfactory.ProvisionedThroughputExceededException: An error occurred (ProvisionedThroughputExceededException) when calling the BatchWriteItem operation (reached max retries: 9): The level of configured provisioned throughput for the table was exceeded. Consider increasing your provisioning level with the UpdateTable API.

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at org.apache.spark.sql.execution.python.PythonForeachWriter.close(PythonForeachWriter.scala:66)
    at org.apache.spark.sql.execution.streaming.sources.ForeachDataWriter.commit(ForeachWriterProvider.scala:129)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:127)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

从错误中我了解到,我从kafka获得的数据量太大,DynamoDB无法在表中写入,因为它超出了数据库的容量。Write capacity units在DynamoDB表中设置为20个单位。你知道吗

writeStream的代码如下:

def save_source_events_output(self, *, app_name, source_events, sink_type=None, writerClass=None, trigger_freq="2 seconds", out_put_mode='update'):
        output = (
            source_events
            .writeStream
            .outputMode(out_put_mode)
            .foreach(writerClass(**self.job_config_data))
            .trigger(processingTime=trigger_freq)
            .start()
        )
        output.awaitTermination()

有人能告诉我,如果我需要运行spark-submit作业在DynamoDB中写入数据,只要它从kafka获取数据,那么如何才能消除这个错误?你知道吗


Tags: inpyorgapisqllibusrapache