使用Python在配置单元选项卡中插入多行

2024-04-29 15:21:05 发布

您现在位置:Python中文网/ 问答频道 /正文

Hive是一个数据仓库,用于查询和聚合驻留在HDFS上的大型数据集。

标准的INSERT INTO语法性能很差,因为:

  1. 每个语句都需要执行一个Map/Reduce进程。
  2. 每一条语句都将导致一个新文件被添加到HDFS中—随着时间的推移,这将导致从表中读取时性能非常差。

也就是说,现在有一个用于Hive/HCatalog的流式API,如详细的here

我面临着使用Python以极快的速度将数据插入Hive的需要。我知道pyhivepyhs2库,但它们似乎都没有使用流式API。

有没有人成功地让Python使用流式API将许多行插入到Hive中,这是如何做到的?

我期待你的洞察力!


Tags: 数据apimapreduce标准进程语法流式
1条回答
网友
1楼 · 发布于 2024-04-29 15:21:05

配置单元用户可以通过脚本流式处理表来转换该数据: 添加文件replace-nan-with-zeros.py

SELECT
  TRANSFORM (...)
  USING 'python replace-nan-with-zeros.py'
  AS (...)
FROM some_table;

下面是一个简单的Python脚本:

    #!/usr/bin/env python
    import sys

kFirstColumns= 7

def main(argv):

    for line in sys.stdin:
        line = line.strip();
        inputs = line.split('\t')

        # replace NaNs with zeros
        outputs = [ ]
        columnIndex = 1;
        for value in inputs:
            newValue = value
            if columnIndex > kFirstColumns:
                newValue = value.replace('NaN','0.0')
            outputs.append(newValue)
            columnIndex = columnIndex + 1

        print '\t'.join(outputs)

if __name__ == "__main__":
    main(sys.argv[1:])

蜂巢和Python

Python可以通过HiveQL转换语句用作Hive中的UDF。例如,下面的HiveQL调用streaming.py文件中存储的Python脚本。

基于Linux的HDInsight

添加文件wasb:///streaming.py

SELECT TRANSFORM (clientid, devicemake, devicemodel)
  USING 'streaming.py' AS
  (clientid string, phoneLable string, phoneHash string)
FROM hivesampletable
ORDER BY clientid LIMIT 50;

基于Windows的HDInsight

添加文件wasb:///streaming.py

SELECT TRANSFORM (clientid, devicemake, devicemodel)
  USING 'D:\Python27\python.exe streaming.py' AS
  (clientid string, phoneLable string, phoneHash string)
FROM hivesampletable
ORDER BY clientid LIMIT 50;

下面是这个例子的作用:

1.文件开头的add file语句将streaming.py文件添加到分布式缓存中,因此集群中的所有节点都可以访问该文件。

2.选择转换。。。USING语句从hivesampletable中选择数据,并将clientid、devicemake和devicemodel传递给streaming.py脚本。

3.AS子句描述streaming.py返回的字段

下面是HiveQL示例使用的streaming.py文件。

#!/usr/bin/env python

import sys
import string
import hashlib

while True:
  line = sys.stdin.readline()
  if not line:
    break

  line = string.strip(line, "\n ")
  clientid, devicemake, devicemodel = string.split(line, "\t")
  phone_label = devicemake + ' ' + devicemodel
  print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()])

由于我们使用流式处理,此脚本必须执行以下操作:

1.从STDIN读取数据。这是通过在本例中使用sys.stdin.readline()实现的。

2.使用string.strip(line,“\n”)删除后面的换行符,因为我们只需要文本数据,而不需要行尾指示符。

3.进行流处理时,一行包含所有值,每个值之间有一个制表符。所以string.split(line,“\t”)可用于分割每个选项卡上的输入,只返回字段。

4.处理完成后,输出必须作为一行写入STDOUT,每个字段之间有一个制表符。这是通过使用print“\t”.join([clientid,phone_label,hashlib.md5(phone_label).hexdigest()]实现的。

5.这一切都发生在while循环中,循环将重复,直到没有读取行,此时break退出循环,脚本终止。

除此之外,该脚本只连接devicemake和devicemodel的输入值,并计算连接值的哈希值。很简单,但它描述了从配置单元调用的任何Python脚本应该如何工作的基本原理:循环、读取输入直到没有更多的输入、在选项卡处将每一行输入分开、处理、编写一行选项卡分隔的输出。

相关问题 更多 >