使用Sp加载CSV文件

2024-04-26 06:10:08 发布

您现在位置:Python中文网/ 问答频道 /正文

我是Spark的新手,我正在尝试从Spark文件中读取CSV数据。 我正在做的是:

sc.textFile('file.csv')
    .map(lambda line: (line.split(',')[0], line.split(',')[1]))
    .collect()

我希望这个调用能给我一个文件前两列的列表,但是我得到了这个错误:

File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range

尽管我的CSV文件不止一列。


Tags: 文件csv数据lambdamap列表linespark
3条回答

火花2.0.0+

您可以直接使用内置csv数据源:

spark.read.csv(
    "some_input_file.csv", header=True, mode="DROPMALFORMED", schema=schema
)

或者

(spark.read
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .csv("some_input_file.csv"))

不包括任何外部依赖项。

火花<;2.0.0

与手动解析(在一般情况下这远不是微不足道的)不同,我建议^{}

确保Spark CSV包含在路径中(--packages--jars--driver-class-path

并按如下方式加载数据:

(df = sqlContext
    .read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferschema", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))

它可以处理加载、模式推断、删除格式错误的行,并且不需要将数据从Python传递到JVM。

注意:

如果您知道模式,最好避免模式推断并将其传递给DataFrameReader。假设您有三列-integer、double和string:

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

schema = StructType([
    StructField("A", IntegerType()),
    StructField("B", DoubleType()),
    StructField("C", StringType())
])

(sqlContext
    .read
    .format("com.databricks.spark.csv")
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.read.csv("/home/stp/test1.csv",header=True,sep="|");

print(df.collect())

您确定所有行至少有两列吗?你能试试这样的东西吗,只是检查一下吗?以下内容:

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)>1) \
    .map(lambda line: (line[0],line[1])) \
    .collect()

或者,您可以打印罪犯(如果有的话):

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)<=1) \
    .collect()

相关问题 更多 >