Databricks的单元测试与模拟
databricks-test的Python项目详细描述
数据块测试
关于
Databricks笔记本电脑的实验单元测试框架。在
这个开源项目不是由Databricks开发的,也不是隶属于Databricks。
安装
pip install databricks_test
使用
在Databricks笔记本的开头添加一个单元格:
^{pr2}$if
子句导致在Databricks中运行时跳过内部代码。
因此,不需要在Databricks环境中安装databricks_test
模块。在
将笔记本添加到代码项目中,例如使用GitHub version control in Azure Databricks。在
在代码项目中设置pytest(在Databricks之外)。在
使用以下结构创建测试用例:
importdatabricks_testdeftest_method():withdatabricks_test.session()asdbrickstest:# Set up mocks on dbrickstest# ...# Run notebookdbrickstest.run_notebook("notebook_dir","notebook_name_without_py_suffix")# Test assertions# ...
您可以在dbrickstest
上设置mocks,例如:
dbrickstest.dbutils.widgets.get.return_value="myvalue"
更多示例请参见下面的示例。在
支持的功能
- 注入Databricks笔记本的Spark上下文:
spark
,table
,sql
等 - PySpark具有所有Spark功能,包括读写磁盘、udf和Pandas udf
- 带有用户可配置模拟的Databricks实用程序(
dbutils
,display
) - 模拟连接器,如Azure存储、S3和SQL数据仓库
不支持的功能
- 不支持
.py
(.ipynb
,.dbc
)以外的笔记本格式 - 非python单元格,如
%scala
和%sql
(跳过这些单元格,因为它们作为注释存储在.py
笔记本中) - 直接写入本地文件系统上的
/dbfs
装载: 改为写入本地临时文件并使用dbutils.fs.cp()复制到DBFS,可以用mock截获它 - Spark的数据库扩展,如
spark.read.format("binaryFile")
样品检验
一个ETL笔记本的示例测试用例,读取CSV并编写Parquet。在
importpandasaspdimportdatabricks_testfromtempfileimportTemporaryDirectoryfrompandas.testingimportassert_frame_equaldeftest_etl():withdatabricks_test.session()asdbrickstest:withTemporaryDirectory()astmp_dir:out_dir=f"{tmp_dir}/out"# Provide input and output location as widgets to notebookswitch={"input":"tests/etl_input.csv","output":out_dir,}dbrickstest.dbutils.widgets.get.side_effect=lambdax:switch.get(x,"")# Run notebookdbrickstest.run_notebook(".","etl_notebook")# Notebook produces a Parquet file (directory)resultDF=pd.read_parquet(out_dir)# Compare produced Parquet file and expected CSV fileexpectedDF=pd.read_csv("tests/etl_expected.csv")assert_frame_equal(expectedDF,resultDF,check_dtype=False)
在笔记本中,我们使用小部件传递参数。 这样就很容易通过 测试中的本地文件位置和远程URL(如Azure存储或S3) 在生产中。在
# Databricks notebook source# This notebook processed the training dataset (imported by Data Factory)# and computes a cleaned dataset with additional features such as city.frompyspark.sql.typesimportStructType,StructFieldfrompyspark.sql.typesimportDoubleType,IntegerTypefrompyspark.sql.functionsimportcol,pandas_udf,PandasUDFType# COMMAND ----------# Instrument for unit tests. This is only executed in local unit tests, not in Databricks.if'dbutils'notinlocals():importdatabricks_testdatabricks_test.inject_variables()# COMMAND ----------# Widgets for interactive development.dbutils.widgets.text("input","")dbutils.widgets.text("output","")dbutils.widgets.text("secretscope","")dbutils.widgets.text("secretname","")dbutils.widgets.text("keyname","")# COMMAND ----------# Set up storage credentialsspark.conf.set(dbutils.widgets.get("keyname"),dbutils.secrets.get(scope=dbutils.widgets.get("secretscope"),key=dbutils.widgets.get("secretname")),)# COMMAND ----------# Import CSV filesschema=StructType([StructField("aDouble",DoubleType(),nullable=False),StructField("anInteger",IntegerType(),nullable=False),])df=(spark.read.format("csv").options(header="true",mode="FAILFAST").schema(schema).load(dbutils.widgets.get('input')))display(df)# COMMAND ----------df.count()# COMMAND ----------# Inputs and output are pandas.Series of doubles@pandas_udf('integer',PandasUDFType.SCALAR)defsquare(x):returnx*x# COMMAND ----------# Write out Parquet data(df.withColumn("aSquaredInteger",square(col("anInteger"))).write.parquet(dbutils.widgets.get('output')))
嘲笑高级
为连接到azuresql数据仓库的笔记本模拟PySpark类的示例测试用例。在
importdatabricks_testimportpysparkimportpyspark.sql.functionsasFfromtempfileimportTemporaryDirectoryfrompandas.testingimportassert_frame_equalimportpandasaspddeftest_sqldw(monkeypatch):withdatabricks_test.session()asdbrickstest,TemporaryDirectory()astmp:out_dir=f"{tmp}/out"# Mock SQL DW loader, creating a Spark DataFrame insteaddefmock_load(reader):return(dbrickstest.spark.range(10).withColumn("age",F.col("id")*6).withColumn("salary",F.col("id")*10000))monkeypatch.setattr(pyspark.sql.readwriter.DataFrameReader,"load",mock_load)# Mock SQL DW writer, writing to a local Parquet file insteaddefmock_save(writer):monkeypatch.undo()writer.format("parquet")writer.save(out_dir)monkeypatch.setattr(pyspark.sql.readwriter.DataFrameWriter,"save",mock_save)# Run notebookdbrickstest.run_notebook(".","sqldw_notebook")# Notebook produces a Parquet file (directory)resultDF=pd.read_parquet(out_dir)# Compare produced Parquet file and expected CSV fileexpectedDF=pd.read_csv("tests/sqldw_expected.csv")assert_frame_equal(expectedDF,resultDF,check_dtype=False)
问题
请在https://github.com/microsoft/DataOps/issues报告问题。在
- 项目
标签: