在数据库的elasticSearch中,我有以下数据:
{
"titre": "Formation ElasticSearch",
"sous-titre": "Mon sous titre",
"formateurs": [
{
"prenom": "Martin",
"nom": "Legros"
}
],
"jours": 3,
"url": "http://test.fr"
}
格式化者是一组人。我们只有一个人。你知道吗
我在pySpark上做了这个映射:
person= StructType([
StructField("nom", StringType()),
StructField("prenom", StringType()),
])
schema= StructType([
StructField("titre", StringType()),
StructField("sous-titre", StringType()),
StructField("jours", LongType()),
StructField("url", StringType()),
StructField("formateurs", ArrayType(person)),
])
parcel= sqlContext.read.format("org.elasticsearch.spark.sql").schema(schema).load("zenika")
parcel.printSchema()
parcel.show(1)
我得到这个模式:
|-- titre: string (nullable = true)
|-- sous-titre: string (nullable = true)
|-- jours: long (nullable = true)
|-- url: string (nullable = true)
|-- formateurs: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- nom: string (nullable = true)
| | |-- prenom: string (nullable = true)
在这个例子中没有错误
但是如果我加上一个格式化程序,我就有一个错误。示例:
{
"titre": "Formation ElasticSearch",
"sous-titre": "Mon sous titre",
"formateurs": [
{
"prenom": "Martin",
"nom": "Legros"
},
{
"prenom": "Marc",
"nom": "Duchien"
}
],
"jours": 3,
"url": "http://test.fr"
}
我得到一个错误:
Caused by: org.elasticsearch.hadoop.EsHadoopIllegalStateException: Field 'formateurs.nom' not found; typically this occurs with arrays which are not mapped as single value
at org.elasticsearch.spark.sql.RowValueReader$class.rowColumns(RowValueReader.scala:51)
at org.elasticsearch.spark.sql.ScalaRowValueReader.rowColumns(ScalaEsRowValueReader.scala:32)
at org.elasticsearch.spark.sql.ScalaRowValueReader.createMap(ScalaEsRowValueReader.scala:69)
at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:968)
at org.elasticsearch.hadoop.serialization.ScrollReader.readListItem(ScrollReader.java:875)
at org.elasticsearch.hadoop.serialization.ScrollReader.list(ScrollReader.java:927)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:833)
at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:1004)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:846)
at org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:602)
at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:426)
... 27 more
你能给我解释一下如何使ArrayType,因为我没有找到一个复杂的模式教程。你知道吗
非常感谢。你知道吗
编辑--------------------
经过研究,我发现:
conf= SparkConf() \
.set("es.read.field.as.array.include", "formateurs.nom, ...") \
.set("es.nodes", "localhost") \
.set( "es.port", "9200") \
.set( "es.input.json", "yes")
sc = SparkContext(conf=conf)
只需添加es.read.字段.作为.array.include到SparkContext的配置。 可以添加用逗号分隔的嵌套对象
目前没有回答
相关问题 更多 >
编程相关推荐