我正在从卡夫卡流中获取数据,并将它们保存到DynamoDB中。执行此操作时,出现以下错误:
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt/yarn/usercache/hadoop/appcache/application_1577444134805_0063/container_1577444134805_0063_01_000004/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/mnt/yarn/usercache/hadoop/appcache/application_1577444134805_0063/container_1577444134805_0063_01_000004/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 1007, in func_with_open_process_close
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 1000, in func_with_open_process_close
File "./pyemr.zip/pyemr/dynamowriter.py", line 117, in process
Item=event
File "/usr/local/lib/python3.6/site-packages/boto3/dynamodb/table.py", line 101, in put_item
self._add_request_and_process({'PutRequest': {'Item': Item}})
File "/usr/local/lib/python3.6/site-packages/boto3/dynamodb/table.py", line 110, in _add_request_and_process
self._flush_if_needed()
File "/usr/local/lib/python3.6/site-packages/boto3/dynamodb/table.py", line 131, in _flush_if_needed
self._flush()
File "/usr/local/lib/python3.6/site-packages/boto3/dynamodb/table.py", line 137, in _flush
RequestItems={self._table_name: items_to_send})
File "/usr/local/lib/python3.6/site-packages/botocore/client.py", line 276, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/usr/local/lib/python3.6/site-packages/botocore/client.py", line 586, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.errorfactory.ProvisionedThroughputExceededException: An error occurred (ProvisionedThroughputExceededException) when calling the BatchWriteItem operation (reached max retries: 9): The level of configured provisioned throughput for the table was exceeded. Consider increasing your provisioning level with the UpdateTable API.
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.sql.execution.python.PythonForeachWriter.close(PythonForeachWriter.scala:66)
at org.apache.spark.sql.execution.streaming.sources.ForeachDataWriter.commit(ForeachWriterProvider.scala:129)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:127)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
从错误中我了解到,我从kafka获得的数据量太大,DynamoDB无法在表中写入,因为它超出了数据库的容量。Write capacity units
在DynamoDB表中设置为20个单位。你知道吗
writeStream
的代码如下:
def save_source_events_output(self, *, app_name, source_events, sink_type=None, writerClass=None, trigger_freq="2 seconds", out_put_mode='update'):
output = (
source_events
.writeStream
.outputMode(out_put_mode)
.foreach(writerClass(**self.job_config_data))
.trigger(processingTime=trigger_freq)
.start()
)
output.awaitTermination()
有人能告诉我,如果我需要运行spark-submit
作业在DynamoDB中写入数据,只要它从kafka获取数据,那么如何才能消除这个错误?你知道吗
目前没有回答
相关问题 更多 >
编程相关推荐