python中的数据帧比较

datacomp的Python项目详细描述


数据压缩

datacompy是用来比较两个pandas数据帧的包。最初开始 可以替代s a s的pandas数据帧的proc compare 它的功能不仅仅是pandas.dataframe.equals(pandas.dataframe) (因为它会打印出一些统计数据,并让您调整匹配的准确度)。 然后扩展以将该功能传递给Spark数据帧。

快速安装

pip install datacompy

熊猫细节

datacompy将尝试在联接列列表中联接两个数据帧,或者 在索引上。如果两个数据帧基于联接值具有重复项,则 匹配进程按剩余字段排序,并基于该行号进行联接。

即使数据类型不匹配,按列比较也会尝试匹配值。 例如,如果一个列中有一个decimal.decimal值 数据帧和在另一个数据帧中具有 float64 dtype的同名列, 它将告诉您数据类型不同,但仍将尝试比较 值:

基本用法

fromioimportStringIOimportpandasaspdimportdatacompydata1="""acct_id,dollar_amt,name,float_fld,date_fld
10000001234,123.45,George Maharis,14530.1555,2017-01-01
10000001235,0.45,Michael Bluth,1,2017-01-01
10000001236,1345,George Bluth,,2017-01-01
10000001237,123456,Bob Loblaw,345.12,2017-01-01
10000001239,1.05,Lucille Bluth,,2017-01-01
"""data2="""acct_id,dollar_amt,name,float_fld
10000001234,123.4,George Michael Bluth,14530.155
10000001235,0.45,Michael Bluth,
10000001236,1345,George Bluth,1
10000001237,123456,Robert Loblaw,345.12
10000001238,1.05,Loose Seal Bluth,111
"""df1=pd.read_csv(StringIO(data1))df2=pd.read_csv(StringIO(data2))compare=datacompy.Compare(df1,df2,join_columns='acct_id',#You can also specify a list of columnsabs_tol=0,#Optional, defaults to 0rel_tol=0,#Optional, defaults to 0df1_name='Original',#Optional, defaults to 'df1'df2_name='New'#Optional, defaults to 'df2')compare.matches(ignore_extra_columns=False)# False# This method prints out a human-readable report summarizing and sampling differencesprint(compare.report())

有关更详细的使用说明和报表输出示例,请参阅文档。

幕后发生的事情
  • 将两个数据帧( df1 df2 )传递到 datacompy。 要连接的列(或列列表)到 连接列 。默认情况下 比较需要精确匹配值,但可以传入 abs-tol 和/或 相对公差 对数值列应用绝对和/或相对公差。
    • 您可以传入 on_index=true 而不是 连接列 而是索引。
  • 类验证您传递的数据帧是否包含 中的列join_columns 并具有除此之外的唯一列名。这个 类还将所有列名小写以消除歧义。
  • 初始化时,类验证输入并运行比较。
  • compare.matches() 将返回 true 如果数据帧匹配, false 否则。
    • 您可以传入ignore_extra_columns=true 因为有不重叠的列名(仍将选中 重叠列)
    • 注意:如果只想验证数据帧是否完全匹配或 不,你应该看看pandas.testing.assert_frame_equal 。主要 datacompy的用例是当您需要解释差异时 在两个数据帧之间。
  • compare还有一些快捷方式,如
    • 用于获取 交集,只有df1和df2记录(数据帧)
    • 对于 得到交集,只有df1和df2列(集合)
  • 您可以打开日志以查看更详细的日志。

火花细节

数据组件>sparkcompare 类将在联接列表中联接两个数据帧 柱。它能够映射每个列中可能不同的列名 数据帧,包括在联接列中。您负责创建 spark可以处理并指定唯一联接的任何源的数据帧 关键。如果通过联接键在任一数据帧中有重复项,则匹配过程 将在加入之前删除副本(并告诉您有多少副本 发现)

与基于pandas的compare类一样,甚至会尝试进行比较 如果数据类型不匹配。任何模式差异都将在输出中报告 以及在任何不匹配报告中,以便您可以评估 类型不匹配是否有问题。

选择使用sparkcompare而不是compare的主要原因 是因为数据太大而无法放入内存,还是您正在比较数据 在spark环境中工作良好,比如分区拼花、csv或json 文件或大脑表。

性能影响

火花的刻度非常好,因此您可以使用spark compare来比较 数十亿行的数据,前提是你有足够大的集群。仍然, 连接数十亿行数据是一项固有的大任务,因此 在进入 "大数据"的陈词滥调领域:

  • sparkcompare 将比较dataframes和 其余的报告。如果数据中有您不关心的列 比较,在数据帧上使用 select 语句/方法进行筛选 那些人出去了。尤其是从宽拼花文件中读取时,这可以使 当你不在乎的专栏不一定非得是 读入内存并包含在连接的数据帧中。
  • 对于大型数据集,将 cache_intermediates=true 添加到 sparkcompare 通过缓存某些中间数据帧,调用可以帮助优化性能 在内存中,比如每个输入数据集的重复数据消除版本,或者 数据文件。否则,spark的惰性计算每次都会重新计算这些值 它需要报表中的数据或访问实例属性时的数据。这可能 对于较小的数据帧是可以的,但对于较大的数据帧是昂贵的。你做 在执行此操作之前,需要确保有足够的可用缓存内存,因此 默认情况下,此参数设置为false。

基本用法

importdatetimeimportdatacompyfrompyspark.sqlimportRow# This example assumes you have a SparkSession named "spark" in your environment, as you# do when running `pyspark` from the terminal or in a Databricks notebook (Spark v2.0 and higher)data1=[Row(acct_id=10000001234,dollar_amt=123.45,name='George Maharis',float_fld=14530.1555,date_fld=datetime.date(2017,1,1)),Row(acct_id=10000001235,dollar_amt=0.45,name='Michael Bluth',float_fld=1.0,date_fld=datetime.date(2017,1,1)),Row(acct_id=10000001236,dollar_amt=1345.0,name='George Bluth',float_fld=None,date_fld=datetime.date(2017,1,1)),Row(acct_id=10000001237,dollar_amt=123456.0,name='Bob Loblaw',float_fld=345.12,date_fld=datetime.date(2017,1,1)),Row(acct_id=10000001239,dollar_amt=1.05,name='Lucille Bluth',float_fld=None,date_fld=datetime.date(2017,1,1))]data2=[Row(acct_id=10000001234,dollar_amt=123.4,name='George Michael Bluth',float_fld=14530.155),Row(acct_id=10000001235,dollar_amt=0.45,name='Michael Bluth',float_fld=None),Row(acct_id=10000001236,dollar_amt=1345.0,name='George Bluth',float_fld=1.0),Row(acct_id=10000001237,dollar_amt=123456.0,name='Robert Loblaw',float_fld=345.12),Row(acct_id=10000001238,dollar_amt=1.05,name='Loose Seal Bluth',float_fld=111.0)]base_df=spark.createDataFrame(data1)compare_df=spark.createDataFrame(data2)comparison=datacompy.SparkCompare(spark,base_df,compare_df,join_columns=['acct_id'])# This prints out a human-readable report summarizing differencescomparison.report()

在EMR或独立Spark上使用SparkCompare
  1. 设置代理变量
  2. 如果需要,创建一个虚拟环境( virtualenv venv;source venv/bin/activate
  3. pip安装datacompy和要求
  4. 确保设置了spark_home环境变量(这可能是 /usr/lib/spark ,但可能是 根据您的安装而有所不同)
  5. 将pythonpath环境变量扩展为 导出pythonpath=$spark_home/python/lib/py4j-0.10.4-src.zip:$spark_home/python:$pythonpath (请注意,您的PY4J版本可能因使用的Spark版本而异)
  6. < > >

在数据块上使用SPARKCompare
  1. 在本地克隆此存储库
  2. 从repo根目录运行 python setup.py bdist_egg 创建一个datacompy egg。
  3. 从databricks首页,单击"新建"部分下的"库"链接。
  4. < DL>
    在新库页:
    >OL>
  5. 将source更改为"upload python egg or pypi"
  6. 在"upload egg"下,库名称应为"datacompy"
  7. 将datacompy/dist/中的egg文件拖到"Drop library egg here to upload"框中
  8. 单击"创建库"按钮
  9. < > >
  10. 创建库后,从库页面(可以在/users/{login}工作区中找到该页面)。 您可以选择将库附加到的群集。
  11. 导入datacompy 在连接到库所连接的群集的笔记本中并享受它!
  12. < > >

贡献者

我们欢迎您对Capital One的开源项目("项目")感兴趣。 项目的任何贡献者必须接受并签署一份表明同意 许可条款。除了本cla授予capital one和 对于通过Capital One分发的软件的接收者,您保留所有权利、所有权, 以及对您的贡献的兴趣;本cla不影响您 将自己的贡献用于任何其他目的。

本项目遵循开放源码的行为准则。 通过参与,您需要遵守此准则。

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
maven字段#getGenericType()抛出java。lang.TypeNotPresentException   用java绘制三角形的几何图形   java无法下载主题和发件人地址(rediff)   java如何使代码线程安全   java在尝试转换FileInputStream中的文件时,我遇到了一个FileNotFound异常   java Moxy和Jackson如何将Json映射到Pojo   在foreach循环中使用BufferedWriter生成新行的java问题   java为什么我的测试在单次执行中运行时间小于1秒,而在maven构建中运行时间大于20秒?   java如何显示下载附件的进度条   了解java rmi的良好实践   .net可以将Java portlet嵌入ASP。网页?   循环如何多次执行Java方法?   java如何确保用户输入在给定的有效范围内?   java单元测试定理   java如何在IntelliJ上运行外部构建项目?   JAVA:试图编写一个检查字符串是否为数字的方法。总是返回错误   javahadoop将特定键的所有map方法生成的所有值都发送到一个reduce方法,对吗?   在java中读取和使用文件