Spark 在读取Elasticsearch存储的半结构化的数据是,会遇到需要获取array类型的数据。然而会报错

Field ‘reqparams.header’ not found; typically this occurs with arrays which are not mapped as single value

这种情况通过 es.read.field.as.array.include指定array类型,防止报错。代码如下:
除此之外,如果在es插入数据时,如果存在字段有空对象{}或空集合[]的时,需要注意,也可能会导致报错,这种情况可通过es.read.field.exclude排除掉指定的字段,防止报错。代码如下:

from pyspark.sql import SparkSession

if __name__ == '__main__':
    spark = SparkSession.builder \
        .appName("SparkOnEs") \
        .master("local") \
        .config('spark.jars.packages', 'org.elasticsearch:elasticsearch-spark-20_2.11:6.5.2') \
        .getOrCreate()

    df = spark.read.format("org.elasticsearch.spark.sql") \
        .option("es.nodes", "localhost") \
        .option("es.port", "9200") \
   		.option("es.read.field.exclude","reqparams.header")\  #存在字段有空对象{}或空集合[]的时,防止报错
        .option("es.read.field.as.array.include", "sons") \   #需要获取array类型的数据,防止报错
        .load("test/info")

    df.printSchema()
    df.registerTempTable("tmp")

    df2 = spark.sql("select * from tmp")
    df2.show()
    spark.stop()

es 参数可参考https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html

参考: https://blog.csdn.net/qq_33689414/article/details/88644931

Logo

汇聚全球AI编程工具,助力开发者即刻编程。

更多推荐