如何通过Kinesis从Lambda(Python)向Redshift发送数据

2024-05-16 21:12:26 发布

您现在位置:Python中文网/ 问答频道 /正文

我在Python中有一个lambda函数,它生成一些东西并返回一些需要在Redshift中插入的值。在lambda中,我将值推送到Kinesis,后者在S3中复制它们,然后在Redshift中复制。在

lambda中的值在字符串中获得,如下所示:

final_string = 'a;b;d;c'

每个字母都是Redshift中表中不同列的值,因此delimeter是“;”。然后我将数据推送到Kinesis Stream中:

^{pr2}$

然后,kinesis流供给kinesis消防水龙带流。在S3中使用Kinesis Firehose生成的文件如下所示(包括文件中的引号):

"a;b;c;d;c" 

最后,我使用以下语句将数据复制到redshift(在Kinesis firehouse中配置):

copy table
from blabla
BLANKSASNULL 
DELIMITER ';' 
EMPTYASNULL 
NULL AS 'null' 
ESCAPE 
FILLRECORD;

我已经成功地使其工作,并且在Kinesis中只缓冲了一个结果时在Redshift中获取值(不过,在Redshift中创建了一个新列)。因此,当缓冲时间内只执行了一个lambda时,Redshift表如下所示:

  A        B         C         D     no_info_column
  "a       b         c         d"        <null>

当我多次执行lambda时,问题就出现了,因为我在S3中得到了一个包含以下文本的文件:

"a,b,c,d" "a1,b1,c1,d1"

我得到了Redshift错误Extra column(s) found,因为copy语句找不到行分隔。在

我试过以下几件事,但没有成功:

  • 返回lambda中的字符串
  • 正在搜索如何在copy中设置行分隔符(SO question
  • 将列表转换为json而不是字符串。然后我在列表的方括号里出现了问题
  • 在copy语句中使用REMOVEQUOTES

我最初的问题是:“如何从s3复制到用双引号分隔的不同行的redshift”,但问题可能出在我的第一种方法中,所以我决定让问题更宽泛一点。在

那么,我该怎么解决这个问题呢?在


Tags: 文件数据lambda函数字符串redshift列表s3
1条回答
网友
1楼 · 发布于 2024-05-16 21:12:26

如果您希望将流数据发送到Amazon Redshift,可以使用Amazon Kinesis data Firehose。以MB为单位的缓存时间(以秒为单位)对数据进行批处理。在

对Redshift执行小的INSERT操作是不理想的。批量加载数据要好得多。因此,如果需要连续加载数据,Kinesis data Firehose提供了最佳性能组合。在

你提到了“动静流给一个动静的消防水龙带流”。可以直接从AWS Lambda函数写入Kinesis Data firehouse。在

相关问题 更多 >