Databricks的单元测试与模拟

databricks-test的Python项目详细描述


数据块测试

关于

Databricks笔记本电脑的实验单元测试框架。在

这个开源项目不是由Databricks开发的,也不是隶属于Databricks。

安装

pip install databricks_test

使用

在Databricks笔记本的开头添加一个单元格:

^{pr2}$

if子句导致在Databricks中运行时跳过内部代码。 因此,不需要在Databricks环境中安装databricks_test模块。在

将笔记本添加到代码项目中,例如使用GitHub version control in Azure Databricks。在

在代码项目中设置pytest(在Databricks之外)。在

使用以下结构创建测试用例:

importdatabricks_testdeftest_method():withdatabricks_test.session()asdbrickstest:# Set up mocks on dbrickstest# ...# Run notebookdbrickstest.run_notebook("notebook_dir","notebook_name_without_py_suffix")# Test assertions# ...

您可以在dbrickstest上设置mocks,例如:

dbrickstest.dbutils.widgets.get.return_value="myvalue"

更多示例请参见下面的示例。在

支持的功能

  • 注入Databricks笔记本的Spark上下文:sparktablesql
  • PySpark具有所有Spark功能,包括读写磁盘、udf和Pandas udf
  • 带有用户可配置模拟的Databricks实用程序(dbutilsdisplay
  • 模拟连接器,如Azure存储、S3和SQL数据仓库

不支持的功能

  • 不支持.py.ipynb.dbc)以外的笔记本格式
  • 非python单元格,如%scala%sql(跳过这些单元格,因为它们作为注释存储在.py笔记本中)
  • 直接写入本地文件系统上的/dbfs装载: 改为写入本地临时文件并使用dbutils.fs.cp()复制到DBFS,可以用mock截获它
  • Spark的数据库扩展,如spark.read.format("binaryFile")

样品检验

一个ETL笔记本的示例测试用例,读取CSV并编写Parquet。在

importpandasaspdimportdatabricks_testfromtempfileimportTemporaryDirectoryfrompandas.testingimportassert_frame_equaldeftest_etl():withdatabricks_test.session()asdbrickstest:withTemporaryDirectory()astmp_dir:out_dir=f"{tmp_dir}/out"# Provide input and output location as widgets to notebookswitch={"input":"tests/etl_input.csv","output":out_dir,}dbrickstest.dbutils.widgets.get.side_effect=lambdax:switch.get(x,"")# Run notebookdbrickstest.run_notebook(".","etl_notebook")# Notebook produces a Parquet file (directory)resultDF=pd.read_parquet(out_dir)# Compare produced Parquet file and expected CSV fileexpectedDF=pd.read_csv("tests/etl_expected.csv")assert_frame_equal(expectedDF,resultDF,check_dtype=False)

在笔记本中,我们使用小部件传递参数。 这样就很容易通过 测试中的本地文件位置和远程URL(如Azure存储或S3) 在生产中。在

# Databricks notebook source# This notebook processed the training dataset (imported by Data Factory)# and computes a cleaned dataset with additional features such as city.frompyspark.sql.typesimportStructType,StructFieldfrompyspark.sql.typesimportDoubleType,IntegerTypefrompyspark.sql.functionsimportcol,pandas_udf,PandasUDFType# COMMAND ----------# Instrument for unit tests. This is only executed in local unit tests, not in Databricks.if'dbutils'notinlocals():importdatabricks_testdatabricks_test.inject_variables()# COMMAND ----------# Widgets for interactive development.dbutils.widgets.text("input","")dbutils.widgets.text("output","")dbutils.widgets.text("secretscope","")dbutils.widgets.text("secretname","")dbutils.widgets.text("keyname","")# COMMAND ----------# Set up storage credentialsspark.conf.set(dbutils.widgets.get("keyname"),dbutils.secrets.get(scope=dbutils.widgets.get("secretscope"),key=dbutils.widgets.get("secretname")),)# COMMAND ----------# Import CSV filesschema=StructType([StructField("aDouble",DoubleType(),nullable=False),StructField("anInteger",IntegerType(),nullable=False),])df=(spark.read.format("csv").options(header="true",mode="FAILFAST").schema(schema).load(dbutils.widgets.get('input')))display(df)# COMMAND ----------df.count()# COMMAND ----------# Inputs and output are pandas.Series of doubles@pandas_udf('integer',PandasUDFType.SCALAR)defsquare(x):returnx*x# COMMAND ----------# Write out Parquet data(df.withColumn("aSquaredInteger",square(col("anInteger"))).write.parquet(dbutils.widgets.get('output')))

嘲笑高级

为连接到azuresql数据仓库的笔记本模拟PySpark类的示例测试用例。在

importdatabricks_testimportpysparkimportpyspark.sql.functionsasFfromtempfileimportTemporaryDirectoryfrompandas.testingimportassert_frame_equalimportpandasaspddeftest_sqldw(monkeypatch):withdatabricks_test.session()asdbrickstest,TemporaryDirectory()astmp:out_dir=f"{tmp}/out"# Mock SQL DW loader, creating a Spark DataFrame insteaddefmock_load(reader):return(dbrickstest.spark.range(10).withColumn("age",F.col("id")*6).withColumn("salary",F.col("id")*10000))monkeypatch.setattr(pyspark.sql.readwriter.DataFrameReader,"load",mock_load)# Mock SQL DW writer, writing to a local Parquet file insteaddefmock_save(writer):monkeypatch.undo()writer.format("parquet")writer.save(out_dir)monkeypatch.setattr(pyspark.sql.readwriter.DataFrameWriter,"save",mock_save)# Run notebookdbrickstest.run_notebook(".","sqldw_notebook")# Notebook produces a Parquet file (directory)resultDF=pd.read_parquet(out_dir)# Compare produced Parquet file and expected CSV fileexpectedDF=pd.read_csv("tests/sqldw_expected.csv")assert_frame_equal(expectedDF,resultDF,check_dtype=False)

问题

请在https://github.com/microsoft/DataOps/issues报告问题。在

欢迎加入QQ群-->: 979659372 Python中文网_新手群

推荐PyPI第三方库


热门话题
java如何在另一个承诺中解决一个承诺?   java验证字符串输入   如何在Java中将数组转换为链表   配置Logstash以从socket接收数据,并将其插入java中的Elasticsearch   swing构建在Java中以相同顺序运行的JFrame   java什么是工具箱的正确路径。getImage()?   java springbootgradleplugin是否随springboot版本一起移动?   升级gradle插件后,java gradle项目同步仍失败   java CXF服务调用失败,出现意外命名空间上的解组错误   Javaservlet。servlet ctakesrestservice的init()引发异常   java我需要什么正则表达式来读取这个值'12,'   java如何使用Xstream在现有xml文件中导入带有节点的字符串?   基于特殊字符的java子串   java hibernate从查询创建通用对象