I'm currently working on a capability analysis set of stress-testing tools for which I'm using gatling.
Part of this involves loading up an elasticsearch with scroll queries followed by update API calls.
Step 1: Run the scroll initiator and save the _scroll_id
where it can be used by further scroll queries
Step 2: Run a scroll query on repeat, as part of each scroll query make a modification to each hit returned and index it back into elasticsearch, effectively spawning up to 1000 Actions from the one scroll query action, and having the results sampled.
Step 1 is easy. Step 2 not so much.
I'm currently trying to achieve this via a ResponseTransformer that parses JSON-formatted results, makes modifications to each one and fires off a thread for each one that attempts another exec(http(...).post(...) etc)
to index the changes back into elasticsearch.
Basically, I think I'm going about it the wrong way about it. The indexing threads never get run, let alone sampled by gatling.
Here's the main body of my scroll query action:
...
val pool = Executors.newFixedThreadPool(parallelism)
val query = exec(http("Scroll Query")
.get(s"/_search/scroll")
.body(ElFileBody("queries/scrollquery.json")).asJSON // Do the scroll query
.check(jsonPath("$._scroll_id").saveAs("scroll_id")) // Get the scroll ID from the response
.transformResponse { case response if response.isReceived =>
new ResponseWrapper(response) {
val responseJson = JSON.parseFull(response.body.string)
// Get the hits and
val hits = responseJson.get.asInstanceOf[Map[String, Any]]("hits").asInstanceOf[Map[String,Any]]("hits").asInstanceOf[List[Map[String, Any]]]
for (hit <- hits) {
val id = hit.get("_id").get.asInstanceOf[String]
val immutableSource = hit.get("_source").get.asInstanceOf[Map[String, Any]]
val source = collection.mutable.Map(immutableSource.toSeq: _*) // Make the map mutable
source("newfield") = "testvalue" // Make a modification
Thread.sleep(pause) // Pause to simulate topology throughput
pool.execute(new DocumentIndexer(index, doctype, id, source)) // Create a new thread that executes the index request
}
}
}) // Make some mods and re-index into elasticsearch
...
DocumentIndexer looks like this:
class DocumentIndexer(index: String, doctype: String, id: String, source: scala.collection.mutable.Map[String, Any]) extends Runnable {
...
val httpConf = http
.baseURL(s"http://$host:$port/${index}/${doctype}/${id}")
.acceptHeader("application/json")
.doNotTrackHeader("1")
.disableWarmUp
override def run() {
val json = new ObjectMapper().writeValueAsString(source)
exec(http(s"Index ${id}")
.post("/_update")
.body(StringBody(json)).asJSON)
}
}
Thanks for any help/suggestions!
It's possible to achieve this by using jsonPath
to extract the JSON hit array and saving the elements into the session and then, using a foreach in the action chain and exec
-ing the index task in the loop you can perform the indexing accordingly.
ie:
ScrollQuery
...
val query = exec(http("Scroll Query")
.get(s"/_search/scroll")
.body(ElFileBody("queries/scrollquery.json")).asJSON // Do the scroll query
.check(jsonPath("$._scroll_id").saveAs("scroll_id")) // Get the scroll ID from the response
.check(jsonPath("$.hits.hits[*]").ofType[Map[String,Any]].findAll.saveAs("hitsJson")) // Save a List of hit Maps into the session
)
...
Simulation
...
val scrollQueries = scenario("Enrichment Topologies").exec(ScrollQueryInitiator.query, repeat(numberOfPagesToScrollThrough, "scrollQueryCounter"){
exec(ScrollQuery.query, pause(10 seconds).foreach("${hitsJson}", "hit"){ exec(HitProcessor.query) })
})
...
HitProcessor
...
def getBody(session: Session): String = {
val hit = session("hit").as[Map[String,Any]]
val id = hit("_id").asInstanceOf[String]
val source = mapAsScalaMap(hit("_source").asInstanceOf[java.util.LinkedHashMap[String,Any]])
source.put("newfield", "testvalue")
val sourceJson = new ObjectMapper().writeValueAsString(mapAsJavaMap(source))
val json = s"""{"doc":${sourceJson}}"""
json
}
def getId(session: Session): String = {
val hit = session("hit").as[Map[String,Any]]
val id = URLEncoder.encode(hit("_id").asInstanceOf[String], "UTF-8")
val uri = s"/${index}/${doctype}/${id}/_update"
uri
}
val query = exec(http(s"Index Item")
.post(session => getId(session))
.body(StringBody(session => getBody(session))).asJSON)
...
Disclaimer: This code still needs optimising! And I haven't actually learnt much scala yet. Feel free to comment with better solutions
Having done this, what I really want to achieve now is to parallelise a given number of the indexing tasks. ie: I get 1000 hits back, I want to execute an index task for each individual hit, but rather than just iterating over them and doing them one after another, I want to do 10 at a time concurrently.
However, I think this is a separate question, really, so I'll present it as such.
Collected from the Internet
Please contact [email protected] to delete if infringement.
Comments