SPL: Access Elasticsearch

 

Elasticsearch is a distributed search server that provides Restful API to update and search data. The submitted data and search results are both in JSON format. And the calculation engine language–SPL–can parse the JSON strings with the json()function, which can convert the JSON strings into an SPL table sequence for easy data calculation. Therefore, it also provides the es_rest() function to call Elasticsearch API in order to access the data.

es_rest(url, method, content; httpHeader1, httpHeader2, …), the first parameter is the url address which supports different tailing formats for different operations, including http and https; the second parameter is HTTP method whose value may be GET/PUT/POST/DELETE; the third parameter is the content submitted by the HTTP request, and this parameter can be omitted when some operations do not submit any content. After the semicolon, there are multiple HTTP headers that contain information such as the API key to authenticate the identity, specifying the content format, and so on. For details like the HTTP request for each kind of Rest, which method to use, what content to submit, what HTTP header to set, etc., please refer to the official website.

Code sample:


A

1

>apikey="Authorization:ApiKey a2x6aEF……KZ29rT2hoQQ=="

2

'{

"counter" : 1,

"tags" : ["red"]

,"beginTime":"2022-01-03"

,"endTime":"2022-02-15"

}

3

=es_rest("https://localhost:9200/index1/_doc/1", "PUT",A2;"Content-Type: application/x-ndjson",apikey)

4

=json(A3.Content)

5

'{

"counter" : 2,

"tags" : ["gray"]

,"beginTime":"2022-08-03"

,"endTime":"2022-12-15"

}

6

=es_rest("https://localhost:9200/index2/_doc/1", "PUT",A2;"Content-Type: application/x-ndjson",apikey)

7

'{

"script" : {

"source": "ctx._source.counter += params.count",

"lang": "painless",

"params" : {

"count" : 4

}

}

}

8

=es_rest("https://localhost:9200/index1/_update/1", "POST",A7;"Content-Type: application/x-ndjson",apikey)

9

'{

"docs": [

{

"_index": "index1"

,"_id":1

}

,{

"_index": "index2"

,"_id":1

}

]

}

10

=es_rest("https://localhost:9200/_mget", "GET",A9;apikey,"Content-Type: application/json")

11

=json(A10.Content)

12

'{

"query": {

"bool": {

"filter": {

"range": {

"beginTime": {"gte": "2022-01-01", "lte": "2022-07-01"}

}

}

,"boost" : 1.0

}

}

}

13

=es_rest("https://localhost:9200/_search","GET",A12;apikey,"Content-Type: application/json")

14

=json(A13.Content)

15

=es_rest("https://localhost:9200/index1/_doc/1", "DELETE";apikey)

16

=json(A15.Content)

There is the API key in A1, which will be used in subsequent operations of accessing Elasticsearch.

A2 is a new piece of data to be written.

A3 uses the es_rest() function to write the data, and all the information of HTTP responses will be listed after the writing is executed:

..

The Content filed in A3 is the execution result, i.e., a JSON string; A4 uses the json() function to convert it into an SPL table sequence for easy viewing:

..

A6 writes the second piece of data.

A7 defines a json string to update data and increases the value of the counter in index1 from 1 to 5; A8 executes the data updating.

A9 defines a json string to query multiple rows, observes the result in A11, and then converts the nested json string into a table sequence of multiple layers.

..

A12 defines a json string to search data by time, queries the data of beginTime in the first half of the year 2022, and observes the result table sequence in A14:

..

A15 deletes index1.