Databricks在Notebook中运行时无法找到我安装的wheel内的csv文件

1 投票
1 回答
66 浏览
提问于 2025-04-12 05:05

我正在学习Spark,作为一个任务,我们需要在本地创建一个wheel文件,然后在Databricks上安装它(我使用的是Azure Databricks),并通过Databricks Notebook运行来测试它。这个程序涉及到读取一个CSV文件(timezones.csv),这个文件包含在wheel文件里。我确认这个文件确实在wheel里面,而且在我从本地PC的Jupyter Notebook安装并运行时也能正常工作。然而,当我在Databricks Notebook中安装它时,出现了错误,下面是截图:

[PATH_NOT_FOUND] Path does not exist: dbfs:/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/motor_ingesta/resources/timezones.csv. SQLSTATE: 42K03
File <command-3771510969632751>, line 7
      3 from pyspark.sql import SparkSession
      5 spark = SparkSession.builder.getOrCreate()
----> 7 flights_with_utc = aniade_hora_utc(spark, flights_df)
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/motor_ingesta/agregaciones.py:25, in aniade_hora_utc(spark, df)
     23 path_timezones = str(Path(__file__).parent) + "/resources/timezones.csv"
     24 #path_timezones = str(Path("resources") / "timezones.csv")
---> 25 timezones_df = spark.read.options(header="true", inferSchema="true").csv(path_timezones)
     27 # Concateno los datos de las columnas del timezones_df ("iata_code","iana_tz","windows_tz"), a la derecha de
     28 # las columnas del df original, copiando solo en las filas donde coincida el aeropuerto de origen (Origin) con
     29 # el valor de la columna iata_code de timezones.df. Si algun aeropuerto de Origin no apareciera en timezones_df,
     30 # las 3 columnas quedarán con valor nulo (NULL)
     32 df_with_tz = df.join(timezones_df, df["Origin"] == timezones_df["iata_code"], "left_outer")
File /databricks/spark/python/pyspark/instrumentation_utils.py:47, in _wrap_function.<locals>.wrapper(*args, **kwargs)
     45 start = time.perf_counter()
     46 try:
---> 47     res = func(*args, **kwargs)
     48     logger.log_success(
     49         module_name, class_name, function_name, time.perf_counter() - start, signature
     50     )
     51     return res
File /databricks/spark/python/pyspark/sql/readwriter.py:830, in DataFrameReader.csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue, locale, lineSep, pathGlobFilter, recursiveFileLookup, modifiedBefore, modifiedAfter, unescapedQuoteHandling)
    828 if type(path) == list:
    829     assert self._spark._sc._jvm is not None
--> 830     return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    831 elif isinstance(path, RDD):
    833     def func(iterator):
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):
File /databricks/spark/python/pyspark/errors/exceptions/captured.py:230, in capture_sql_exception.<locals>.deco(*a, **kw)
    226 converted = convert_exception(e.java_exception)
    227 if not isinstance(converted, UnknownException):
    228     # Hide where the exception came from that shows a non-Pythonic
    229     # JVM exception message.
--> 230     raise converted from None
    231 else:
    232     raise

Databricks错误截图1: Databricks错误截图1

Databricks错误截图2: Databricks错误截图2

有没有人遇到过这个问题?有什么解决办法吗?

我尝试过用pip和从库安装这个文件,但都得到了相同的错误,还重启了集群好几次。

谢谢大家的帮助。

我使用的是Python 3.11,Pyspark 3.5和Java 8,并且是在PyCharm中本地创建的wheel。如果你需要更多细节来回答,请问我,我会提供。

我已经解释了所有细节。我原本希望能够从Databricks Notebook使用我本地创建的wheel。

抱歉,我的英语不是母语,有点生疏。

编辑以回答评论:

你能导航到 %sh ls /dbfs/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/motor_ingesta/resources 并分享文件夹内容的图片吗? – Samuel Demir 11小时前

我刚刚按照你要求的做了,得到了这个结果(文件实际上在那里,即使Databricks说找不到它……)

建议结果的截图

编辑以回答Samuel Demir:

也许你的设置初始化中缺少 package_data

这是我将配置文件添加到res文件夹中的方法。然后这些文件可以通过与你的代码完全相同的方式访问。

setup(
    name="daprep",
    version=__version__,
    author="",
    author_email="samuel.demir@galliker.com",
    description="A short summary of the project",
    license="proprietary",
    url="",
    packages=find_packages("src"),
    package_dir={"": "src"},
    package_data={"daprep": ["res/**/*"]},
    long_description=read("README.md"),
    install_requires=read_requirements(Path("requirements.txt")),
    tests_require=[
        "pytest",
        "pytest-cov",
        "pre-commit",
    ],
    cmdclass={
        "dist": DistCommand,
        "test": TestCommand,
        "testcov": TestCovCommand,
    },
    platforms="any",
    python_requires=">=3.7",
    entry_points={
        "console_scripts": [
            "main_entrypoint = daprep.main:main_entrypoint",
        ]
    }, )

我的 setup.py 文件中也有package _data这一行,这里你可以看到一个快照和代码。你发现有什么其他可能相关的细节吗?谢谢!

我的setup.py文件的截图

from setuptools import setup, find_packages

setup(
    name="motor-ingesta",
    version="0.1.0",
    author="Estrella Adriana Sicardi Segade",
    author_email="esicardi@ucm.es",
    description="Motor de ingesta para el curso de Spark",
    long_description="Motor de ingesta para el curso de Spark",
    long_description_content_type="text/markdown",
    url="https://github.com/esicardi",
    python_requires=">=3.8",
    packages=find_packages(),
    package_data={"motor_ingesta": ["resources/*.csv"]})

编辑以回答问题:

你能否通过简单地在Notebook单元中读取CSV文件来加载它? – Samuel Demir 17小时前

是的,我能够从代码外部读取这个文件。例如,我可以打印文件的内容:

%sh cat /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/motor_ingesta/resources/timezones.csv

"iata_code","iana_tz","windows_tz"
"AAA","Pacific/Tahiti","Hawaiian Standard Time"
"AAB","Australia/Brisbane","E. Australia Standard Time"
"AAC","Africa/Cairo","Egypt Standard Time"
"AAD","Africa/Mogadishu","E. Africa Standard Time"
"AAE","Africa/Algiers","W. Central Africa Standard Time"
"AAF","America/New_York","Eastern Standard Time"
"AAG","America/Sao_Paulo","E. South America Standard Time"
"AAH","Europe/Berlin","W. Europe Standard Time"
"AAI","America/Araguaina","Tocantins Standard Time"
"AAJ","America/Paramaribo","SA Eastern Standard Time"
"AAK","Pacific/Tarawa","UTC+12"
"AAL","Europe/Copenhagen","Romance Standard Time"
"AAM","Africa/Johannesburg","South Africa Standard Time"
"AAN","Asia/Dubai","Arabian Standard Time"
"AAO","America/Caracas","Venezuela Standard Time"
"AAP","Asia/Makassar","Singapore Standard Time"
"AAQ","Europe/Moscow","Russian Standard Time"
"AAR","Europe/Copenhagen","Romance Standard Time"
"AAS","Asia/Jayapura","Tokyo Standard Time"

...
[file continues on until the end]

你能分享一部分你的代码库以重现这个问题吗? – Samuel Demir 17小时前

关于这个包(叫motor_ingesta),它由三个py文件组成,分别是motor_ingesta.py、agregaciones.py和flujo_diario.py。这个包用于处理一些关于2023年1月美国机场航班信息的JSON文件。读取timezones.csv的函数定义在agregaciones.py中。这个函数接收一个DataFrame(这个函数用于处理之前从JSON文件中获取的DataFrame),并通过匹配机场代码将其与timezones.csv中的数据连接起来。然后它使用这些信息计算出从出发时间(DepTime)和UTC时区的UTC时间。它在DataFrame的右侧添加了一列“FlightTime”,包含UTC时间,之后再删除从timezones.csv中添加的列。以下是这个函数的代码:

def aniade_hora_utc(spark: SparkSession, df: DF) -> DF:
    """
    Añade la columna FlightTime en formato UTC al DataFrame de vuelos.

    :param spark: Sesión de Spark.
    :param df: DataFrame de vuelos.
    :return: DataFrame de vuelos con la columna FlightTime en formato UTC.
    """

        
    path_timezones = str(Path(__file__).parent) + "/resources/timezones.csv"
    timezones_df = spark.read.options(header="true", inferSchema="true").csv(path_timezones)
    df_with_tz = df.join(timezones_df, df["Origin"] == timezones_df["iata_code"], "left_outer")   
    df_with_flight_time = df_with_tz.withColumn("FlightTime", to_utc_timestamp(concat(
        col("FlightDate"), lit(" "),
        lpad(col("DepTime").cast("string"), 4, "0").substr(1, 2), lit(":"),
        col("DepTime").cast("string").substr(-2, 2)
    ), col("iana_tz")))

    df_with_flight_time = df_with_flight_time.drop("iata_code", "iana_tz", "windows_tz")
    return df_with_flight_time

1 个回答

0

可能是你的设置初始化中缺少了 package_data 这个部分吧?!

我通常是这样把我的配置文件添加到 res 文件夹里的,这样打包后,文件就可以通过你那段代码一样访问了。


setup(
    name="daprep",
    version=__version__,
    author="",
    author_email="samuel.demir@galliker.com",
    description="A short summary of the project",
    license="proprietary",
    url="",
    packages=find_packages("src"),
    package_dir={"": "src"},
    package_data={"daprep": ["res/**/*"]},
    long_description=read("README.md"),
    install_requires=read_requirements(Path("requirements.txt")),
    tests_require=[
        "pytest",
        "pytest-cov",
        "pre-commit",
    ],
    cmdclass={
        "dist": DistCommand,
        "test": TestCommand,
        "testcov": TestCovCommand,
    },
    platforms="any",
    python_requires=">=3.7",
    entry_points={
        "console_scripts": [
            "main_entrypoint = daprep.main:main_entrypoint",
        ]
    },
)

撰写回答