Databricks在Notebook中运行时无法找到我安装的wheel内的csv文件
我正在学习Spark,作为一个任务,我们需要在本地创建一个wheel文件,然后在Databricks上安装它(我使用的是Azure Databricks),并通过Databricks Notebook运行来测试它。这个程序涉及到读取一个CSV文件(timezones.csv),这个文件包含在wheel文件里。我确认这个文件确实在wheel里面,而且在我从本地PC的Jupyter Notebook安装并运行时也能正常工作。然而,当我在Databricks Notebook中安装它时,出现了错误,下面是截图:
[PATH_NOT_FOUND] Path does not exist: dbfs:/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/motor_ingesta/resources/timezones.csv. SQLSTATE: 42K03
File <command-3771510969632751>, line 7
3 from pyspark.sql import SparkSession
5 spark = SparkSession.builder.getOrCreate()
----> 7 flights_with_utc = aniade_hora_utc(spark, flights_df)
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/motor_ingesta/agregaciones.py:25, in aniade_hora_utc(spark, df)
23 path_timezones = str(Path(__file__).parent) + "/resources/timezones.csv"
24 #path_timezones = str(Path("resources") / "timezones.csv")
---> 25 timezones_df = spark.read.options(header="true", inferSchema="true").csv(path_timezones)
27 # Concateno los datos de las columnas del timezones_df ("iata_code","iana_tz","windows_tz"), a la derecha de
28 # las columnas del df original, copiando solo en las filas donde coincida el aeropuerto de origen (Origin) con
29 # el valor de la columna iata_code de timezones.df. Si algun aeropuerto de Origin no apareciera en timezones_df,
30 # las 3 columnas quedarán con valor nulo (NULL)
32 df_with_tz = df.join(timezones_df, df["Origin"] == timezones_df["iata_code"], "left_outer")
File /databricks/spark/python/pyspark/instrumentation_utils.py:47, in _wrap_function.<locals>.wrapper(*args, **kwargs)
45 start = time.perf_counter()
46 try:
---> 47 res = func(*args, **kwargs)
48 logger.log_success(
49 module_name, class_name, function_name, time.perf_counter() - start, signature
50 )
51 return res
File /databricks/spark/python/pyspark/sql/readwriter.py:830, in DataFrameReader.csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue, locale, lineSep, pathGlobFilter, recursiveFileLookup, modifiedBefore, modifiedAfter, unescapedQuoteHandling)
828 if type(path) == list:
829 assert self._spark._sc._jvm is not None
--> 830 return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
831 elif isinstance(path, RDD):
833 def func(iterator):
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +\
1317 self.command_header +\
1318 args_command +\
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):
File /databricks/spark/python/pyspark/errors/exceptions/captured.py:230, in capture_sql_exception.<locals>.deco(*a, **kw)
226 converted = convert_exception(e.java_exception)
227 if not isinstance(converted, UnknownException):
228 # Hide where the exception came from that shows a non-Pythonic
229 # JVM exception message.
--> 230 raise converted from None
231 else:
232 raise
Databricks错误截图1:
Databricks错误截图2:
有没有人遇到过这个问题?有什么解决办法吗?
我尝试过用pip和从库安装这个文件,但都得到了相同的错误,还重启了集群好几次。
谢谢大家的帮助。
我使用的是Python 3.11,Pyspark 3.5和Java 8,并且是在PyCharm中本地创建的wheel。如果你需要更多细节来回答,请问我,我会提供。
我已经解释了所有细节。我原本希望能够从Databricks Notebook使用我本地创建的wheel。
抱歉,我的英语不是母语,有点生疏。
编辑以回答评论:
你能导航到 %sh ls /dbfs/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/motor_ingesta/resources 并分享文件夹内容的图片吗? – Samuel Demir 11小时前
我刚刚按照你要求的做了,得到了这个结果(文件实际上在那里,即使Databricks说找不到它……)
编辑以回答Samuel Demir:
也许你的设置初始化中缺少
package_data
?这是我将配置文件添加到res文件夹中的方法。然后这些文件可以通过与你的代码完全相同的方式访问。
setup( name="daprep", version=__version__, author="", author_email="samuel.demir@galliker.com", description="A short summary of the project", license="proprietary", url="", packages=find_packages("src"), package_dir={"": "src"}, package_data={"daprep": ["res/**/*"]}, long_description=read("README.md"), install_requires=read_requirements(Path("requirements.txt")), tests_require=[ "pytest", "pytest-cov", "pre-commit", ], cmdclass={ "dist": DistCommand, "test": TestCommand, "testcov": TestCovCommand, }, platforms="any", python_requires=">=3.7", entry_points={ "console_scripts": [ "main_entrypoint = daprep.main:main_entrypoint", ] }, )
我的 setup.py
文件中也有package _data这一行,这里你可以看到一个快照和代码。你发现有什么其他可能相关的细节吗?谢谢!
from setuptools import setup, find_packages
setup(
name="motor-ingesta",
version="0.1.0",
author="Estrella Adriana Sicardi Segade",
author_email="esicardi@ucm.es",
description="Motor de ingesta para el curso de Spark",
long_description="Motor de ingesta para el curso de Spark",
long_description_content_type="text/markdown",
url="https://github.com/esicardi",
python_requires=">=3.8",
packages=find_packages(),
package_data={"motor_ingesta": ["resources/*.csv"]})
编辑以回答问题:
你能否通过简单地在Notebook单元中读取CSV文件来加载它? – Samuel Demir 17小时前
是的,我能够从代码外部读取这个文件。例如,我可以打印文件的内容:
%sh cat /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/motor_ingesta/resources/timezones.csv
"iata_code","iana_tz","windows_tz"
"AAA","Pacific/Tahiti","Hawaiian Standard Time"
"AAB","Australia/Brisbane","E. Australia Standard Time"
"AAC","Africa/Cairo","Egypt Standard Time"
"AAD","Africa/Mogadishu","E. Africa Standard Time"
"AAE","Africa/Algiers","W. Central Africa Standard Time"
"AAF","America/New_York","Eastern Standard Time"
"AAG","America/Sao_Paulo","E. South America Standard Time"
"AAH","Europe/Berlin","W. Europe Standard Time"
"AAI","America/Araguaina","Tocantins Standard Time"
"AAJ","America/Paramaribo","SA Eastern Standard Time"
"AAK","Pacific/Tarawa","UTC+12"
"AAL","Europe/Copenhagen","Romance Standard Time"
"AAM","Africa/Johannesburg","South Africa Standard Time"
"AAN","Asia/Dubai","Arabian Standard Time"
"AAO","America/Caracas","Venezuela Standard Time"
"AAP","Asia/Makassar","Singapore Standard Time"
"AAQ","Europe/Moscow","Russian Standard Time"
"AAR","Europe/Copenhagen","Romance Standard Time"
"AAS","Asia/Jayapura","Tokyo Standard Time"
...
[file continues on until the end]
你能分享一部分你的代码库以重现这个问题吗? – Samuel Demir 17小时前
关于这个包(叫motor_ingesta),它由三个py文件组成,分别是motor_ingesta.py、agregaciones.py和flujo_diario.py。这个包用于处理一些关于2023年1月美国机场航班信息的JSON文件。读取timezones.csv的函数定义在agregaciones.py中。这个函数接收一个DataFrame(这个函数用于处理之前从JSON文件中获取的DataFrame),并通过匹配机场代码将其与timezones.csv中的数据连接起来。然后它使用这些信息计算出从出发时间(DepTime)和UTC时区的UTC时间。它在DataFrame的右侧添加了一列“FlightTime”,包含UTC时间,之后再删除从timezones.csv中添加的列。以下是这个函数的代码:
def aniade_hora_utc(spark: SparkSession, df: DF) -> DF:
"""
Añade la columna FlightTime en formato UTC al DataFrame de vuelos.
:param spark: Sesión de Spark.
:param df: DataFrame de vuelos.
:return: DataFrame de vuelos con la columna FlightTime en formato UTC.
"""
path_timezones = str(Path(__file__).parent) + "/resources/timezones.csv"
timezones_df = spark.read.options(header="true", inferSchema="true").csv(path_timezones)
df_with_tz = df.join(timezones_df, df["Origin"] == timezones_df["iata_code"], "left_outer")
df_with_flight_time = df_with_tz.withColumn("FlightTime", to_utc_timestamp(concat(
col("FlightDate"), lit(" "),
lpad(col("DepTime").cast("string"), 4, "0").substr(1, 2), lit(":"),
col("DepTime").cast("string").substr(-2, 2)
), col("iana_tz")))
df_with_flight_time = df_with_flight_time.drop("iata_code", "iana_tz", "windows_tz")
return df_with_flight_time
1 个回答
可能是你的设置初始化中缺少了 package_data
这个部分吧?!
我通常是这样把我的配置文件添加到 res 文件夹里的,这样打包后,文件就可以通过你那段代码一样访问了。
setup(
name="daprep",
version=__version__,
author="",
author_email="samuel.demir@galliker.com",
description="A short summary of the project",
license="proprietary",
url="",
packages=find_packages("src"),
package_dir={"": "src"},
package_data={"daprep": ["res/**/*"]},
long_description=read("README.md"),
install_requires=read_requirements(Path("requirements.txt")),
tests_require=[
"pytest",
"pytest-cov",
"pre-commit",
],
cmdclass={
"dist": DistCommand,
"test": TestCommand,
"testcov": TestCovCommand,
},
platforms="any",
python_requires=">=3.7",
entry_points={
"console_scripts": [
"main_entrypoint = daprep.main:main_entrypoint",
]
},
)