https://delta.io logo
s

sabari dass

08/08/2023, 7:01 PM
Hi All, I am reading data from Elasticsearch index using pyspark in Databricks and when I tried to select
ts
field (which is actually
date
fmt but value will be in
epoch_millis
) getting
Java.lang.RuntimeException
. error. Example:
ts
value is 1234567890123 but when I try to read I am getting error Is there any way to read the es index with schema?
Can anyone plz help on this?
c

Chris

08/08/2023, 8:09 PM
What is the actual exception?
s

sabari dass

08/08/2023, 8:24 PM
Error:
Code snippet:
Copy code
ELASTIC_OPTIONS={'es.nodes': '<es_node>:<port>',
 'es.net.http.auth.user': '<uname>',
 'es.net.http.auth.pass': '<pwd>',
 'es.nodes.wan.only': 'true',
 'es.batch.size.bytes': '100mb',
 'es.mapping.id': '<mapping_id>'}

 df_elastic = spark.read.format('es').options(**ELASTIC_OPTIONS) \
            .option("es.read.metadata", "true") \
            .option("es.read.metadata.version", "true") \
            .option("es.mapping.date.rich","false") \
            .load("<index_name>")
@Chris If I remove the
.option("es.mapping.date.rich","false")
then I am able to see the value for ts as 2021-02-19T213924.514+0000 instead of *epoch_millis (*1613770764514) but unable to read other dt field data which is in yyyy-MM-dd HHmmss fmt 😞 I need to get all the data as such how I am having in elasticsearch index. Example: Actual es index data: ts--> 1613770764514 and dt -->2020-01-02 23034 If I read the es index without
.option("es.mapping.date.rich","false")
then I am able to read only ts field that too in "*yyyy-MM-ddTHHmmss.SSS"* but it supposed to be in "1613770764514" and getting error while reading dt field
Copy code
Error:
Caused by: EsHadoopIllegalArgumentException: Cannot invoke method public org.joda.time.DateTime org.joda.time.format.DateTimeFormatter.parseDateTime(java.lang.String)
Caused by: InvocationTargetException: 
Caused by: IllegalArgumentException: Invalid format: "2020-01-02 23:03:46" is malformed at " 23:03:46"
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 930819.0 failed 4 times, most recent failure: Lost task 0.3 in stage 930819.0 (TID 821846) (10.139.64.11 executor 0): org.elasticsearch.hadoop.rest.EsHadoopParsingException: Cannot parse value [2020-01-02 23:03:46] for field [dt]
If I read the es index with
.option("es.mapping.date.rich","false")
then I am able to read the dt field (in yyyy-MM-dd HHmmss) but getting error for ts field
Copy code
Error:
RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.Long is not a valid external type for schema of string
Caused by: RuntimeException: java.lang.Long is not a valid external type for schema of string
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 930822.0 failed 4 times, most recent failure: Lost task 0.3 in stage 930822.0 (TID 821855) (10.139.64.11 executor 0): java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.Long is not a valid external type for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, ts), StringType, true), true, false) AS ts#1636160620
I want to read both ts and dt field as such in elasticsearch using pyspark. Elaticsearch Schema Mapping::
Copy code
{
  "ts": {
    "type": "date",
    "format": "epoch_millis||yyyy-MM-dd||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSSSSS||yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'||yyyy-MM-dd'T'HH:mm:ss.SSSS'Z'||MM-dd-yyyy HH:mm:ss||MM-dd-yyyy HH:mm:ss.SSSSSS||yyyy-MM-dd'T'HH:mm:ss"
  },
  "dt": {
    "type": "date",
    "format": "epoch_millis||yyyy-MM-dd||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd HH:mm:ss.SSSSSS||yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'||yyyy-MM-dd'T'HH:mm:ss.SSSS'Z'||MM-dd-yyyy HH:mm:ss||MM-dd-yyyy HH:mm:ss.SSSSSS||yyyy-MM-dd'T'HH:mm:ss"
  }
}
Is there any other way to read both fields in elasticsearch using pyspark?