Action to spawn multiple further actions in Gatling scenario



I'm currently working on a capability analysis set of stress-testing tools for which I'm using gatling.

Part of this involves loading up an elasticsearch with scroll queries followed by update API calls.

What I want to achieve

Step 1: Run the scroll initiator and save the _scroll_id where it can be used by further scroll queries

Step 2: Run a scroll query on repeat, as part of each scroll query make a modification to each hit returned and index it back into elasticsearch, effectively spawning up to 1000 Actions from the one scroll query action, and having the results sampled.

Step 1 is easy. Step 2 not so much.

What I've tried

I'm currently trying to achieve this via a ResponseTransformer that parses JSON-formatted results, makes modifications to each one and fires off a thread for each one that attempts another exec(http(...).post(...) etc) to index the changes back into elasticsearch.

Basically, I think I'm going about it the wrong way about it. The indexing threads never get run, let alone sampled by gatling.

Here's the main body of my scroll query action:


  val pool = Executors.newFixedThreadPool(parallelism)

  val query = exec(http("Scroll Query")
    .body(ElFileBody("queries/scrollquery.json")).asJSON // Do the scroll query
    .check(jsonPath("$._scroll_id").saveAs("scroll_id")) // Get the scroll ID from the response
    .transformResponse { case response if response.isReceived =>
      new ResponseWrapper(response) {
        val responseJson = JSON.parseFull(response.body.string)
        // Get the hits and
        val hits = responseJson.get.asInstanceOf[Map[String, Any]]("hits").asInstanceOf[Map[String,Any]]("hits").asInstanceOf[List[Map[String, Any]]]
        for (hit <- hits) {
          val id = hit.get("_id").get.asInstanceOf[String]
          val immutableSource = hit.get("_source").get.asInstanceOf[Map[String, Any]]
          val source = collection.mutable.Map(immutableSource.toSeq: _*) // Make the map mutable
          source("newfield") = "testvalue" // Make a modification
          Thread.sleep(pause) // Pause to simulate topology throughput
          pool.execute(new DocumentIndexer(index, doctype, id, source)) // Create a new thread that executes the index request
    }) // Make some mods and re-index into elasticsearch


DocumentIndexer looks like this:

class DocumentIndexer(index: String, doctype: String, id: String, source: scala.collection.mutable.Map[String, Any]) extends Runnable {


  val httpConf = http

  override def run() {

    val json = new ObjectMapper().writeValueAsString(source)

    exec(http(s"Index ${id}")




  1. Is this even possible using gatling?
  2. How can I achieve what I want to achieve?

Thanks for any help/suggestions!


It's possible to achieve this by using jsonPath to extract the JSON hit array and saving the elements into the session and then, using a foreach in the action chain and exec-ing the index task in the loop you can perform the indexing accordingly.


  val query = exec(http("Scroll Query")
    .body(ElFileBody("queries/scrollquery.json")).asJSON // Do the scroll query
    .check(jsonPath("$._scroll_id").saveAs("scroll_id")) // Get the scroll ID from the response
    .check(jsonPath("$.hits.hits[*]").ofType[Map[String,Any]].findAll.saveAs("hitsJson")) // Save a List of hit Maps into the session


    val scrollQueries = scenario("Enrichment Topologies").exec(ScrollQueryInitiator.query, repeat(numberOfPagesToScrollThrough, "scrollQueryCounter"){
        exec(ScrollQuery.query, pause(10 seconds).foreach("${hitsJson}", "hit"){ exec(HitProcessor.query) })


  def getBody(session: Session): String = {
    val hit = session("hit").as[Map[String,Any]]
    val id = hit("_id").asInstanceOf[String]
    val source = mapAsScalaMap(hit("_source").asInstanceOf[java.util.LinkedHashMap[String,Any]])
    source.put("newfield", "testvalue")
    val sourceJson = new ObjectMapper().writeValueAsString(mapAsJavaMap(source))
    val json = s"""{"doc":${sourceJson}}"""

  def getId(session: Session): String = {
    val hit = session("hit").as[Map[String,Any]]
    val id = URLEncoder.encode(hit("_id").asInstanceOf[String], "UTF-8")
    val uri = s"/${index}/${doctype}/${id}/_update"

  val query = exec(http(s"Index Item")
    .post(session => getId(session))
    .body(StringBody(session => getBody(session))).asJSON)

Disclaimer: This code still needs optimising! And I haven't actually learnt much scala yet. Feel free to comment with better solutions

Having done this, what I really want to achieve now is to parallelise a given number of the indexing tasks. ie: I get 1000 hits back, I want to execute an index task for each individual hit, but rather than just iterating over them and doing them one after another, I want to do 10 at a time concurrently.

However, I think this is a separate question, really, so I'll present it as such.

