Analytical Benchmarking for a 1B Event per Day Pipeline

Venkat Krishnamurthy
May 4, 2022

Sneller is a serverless vectorized query engine designed specifically for analytics on semi-structured JSON event data, for which Elastic/Opensearch is a popular alternative today. Sneller speaks SQL natively, but also functions as a drop-in replacement for Elasticsearch via a soon to be open-sourced Elastic query language adapter.

This post is a follow up to our earlier post on the results of a head to head benchmark of Sneller vs Elastic Cloud. Here, we’ll dive more into why and how we built bespoke benchmarking infrastructure for event data.

Why a custom benchmark harness?

There’s no shortage of benchmarks for data infrastructure, especially for analytics. The TPC suite has long had several benchmarks for many different kinds of workloads. TPC-H and TPC-DS are well-known, with recent ones like TPC-IoT tackling specific use cases. In addition, Clickhouse’s Clickbench is another recent comparative benchmark focused on analytic data platforms/stores.

Naturally, we started off asking - ‘Why not use an existing benchmark?’ After all, TPC-DS/TPC-H in particular are well established in the analytics community, and Clickhouse has also built a comprehensive database of how different systems compare under a set of common queries against a static dataset of 100 million records.

In the end, we built our own harness because we felt that none of these existing benchmarks accurately captured the key characteristics of real-world event data pipelines. We set out to capture two themes we heard from our prospects/customers:

Concurrent query + ingestion:  Our primary prospects today use Elasticsearch (or Opensearch) for their event data analytics. Their pipelines handle ‘always-on’ event data, often being ingested at rates of 10,000 events (or more) per second per Elastic endpoint, from observability, security or user event data sources. This infrastructures also typically handle high query throughput (100s of queries/minute) from Kibana or other dashboarding tools, in addition to continuous data ingest.

From a benchmark perspective, this meant two things:

First, we needed a scalable way to generate real-world events at realistic data rates to model event data sources. We chose 10000 events/sec as our baseline data rate.

Second, we wanted the data itself to be realistic. Since we’re targeting JSON-encoded event payloads, a great proxy for such a workload turned out to be AWS Cloudtrail events. A typical Cloudtrail event is encoded in JSON by default, and is a complex object with nested and optional fields. Here’s an example of a Cloudtrail event.

{"eventVersion": "1.05",
   "userIdentity": {
       "type": "IAMUser",
       "principalId": "AIDAJDPLRKLG7UEXAMPLE",
       "arn": "arn:aws:iam::123456789012:user/Mary_Major",
       "accountId": "123456789012",
       "accessKeyId": "AKIAIOSFODNN7EXAMPLE",
       "userName": "Mary_Major",
       "sessionContext": {
           "sessionIssuer": {},
           "webIdFederationData": {},
           "attributes": {
               "mfaAuthenticated": "false",
               "creationDate": "2019-06-18T22:28:31Z"
           }
       },
       "invokedBy": "signin.amazonaws.com"
   },
   "eventTime": "2019-06-19T00:18:31Z",
   "eventSource": "cloudtrail.amazonaws.com",
   "eventName": "StartLogging",
   "awsRegion": "us-east-2",
   "sourceIPAddress": "203.0.113.64",
   "userAgent": "signin.amazonaws.com",
   "requestParameters": {
       "name": "arn:aws:cloudtrail:us-east-2:123456789012:trail/My-First-Trail"
   },
   "responseElements": null,
   "requestID": "ddf5140f-EXAMPLE",
   "eventID": "7116c6a1-EXAMPLE",
   "readOnly": false,
   "eventType": "AwsApiCall",
   "recipientAccountId": "123456789012"
}

We built a custom data generator in golang, using the jsonnet templating language, extensible to other JSON event types.

Query workload: Different patterns of usage

Often, benchmarking analytics systems use a process like so:


1. Generate synthetic data at a specific scale factor (e.g. TPC-H has scale factors from 10-10000)
2. Run a query workload a few times at these different scale factors
3. Compute and report an average of the relevant performance/timing measures.

While this is useful, we felt it was simplistic for event data pipelines, so we chose to build our own infrastructure to benchmark two distinct real-world event data use cases we saw repeatedly across all our design partners - Monitoring and Investigation.

Monitoring:

A typical event data pipeline in production using Elasticsearch (for example) involves

- Constantly arriving data at O(1000 events) per second or more
- Different users querying the data often using monitoring dashboards like the below



Monitoring dashboards result in query patterns that are quite different from typical benchmark query workloads. They show 2 primary parameters that are typical of event data monitoring:


‘Lookback’ windows: Often, monitoring dashboards are configured over specific time windows relative to current time (hence the term ‘lookback’). A production deployment likely has many users running an arbitrary mix of different lookback windows.

Refresh intervals:  A natural expectation with constantly arriving event data is that monitoring dashboards refresh frequently to keep the analytics up to date. We found that the shorter the lookback window, the more frequent the refresh interval - e.g. dashboards looking at 30 mins of data may refresh e.g every 30 sec compared to a 5 minutes for the 6h lookback window.

The query arrival pattern resulting from monitoring dashboards is: The dashboard fires off a batch of N concurrent queries (1 for each of N charts on the dashboard), sleeps for S seconds (the refresh interval) and repeats this cycle. Here’s an example of an Elastic query in the benchmark, extracted from a Kibana dashboard and subsequently parametrized for different lookback windows.

{  "elasticQuery": {
   "aggs": {
     "2": {
       "terms": {
         "field": "awsRegion.keyword",
         "order": {
           "_count": "desc"
         },
         "size": 10
       },        "aggs": {
         "3": {
           "terms": {
             "field": "sourceIPAddress.geoip.country_code.keyword",
             "order": {
               "_count": "desc"
             },
             "size": 10
           },
           "aggs": {
             "4": {
               "terms": {
                 "field": "userAgent.keyword",
                 "order":
                   "_count": "desc"
                 },
                 "size": 10
               }
             }

         }
       }
     }
   },
   "size": 0,
   "fields": [
     {
       "field": "eventTime",

     },
     {

       "format": "date_time"
     }
   "query": {
     "bool": {
       "filter": [
         {
           "range": {
             "eventTime": {
               "format": "strict_date_optional_time",
               "gte": queryParam.elastic.begin,
               "lte": queryParam.elastic.end
             }
           }
         }
       ]
     }
 }
}

We also felt it would be appropriate to run the benchmark for a minimum of 24 hours, combining data ingest and queries throughout this period. For this benchmark, we didnt simulate peaks or quiet periods, although that’s easy to do with this setup.

Investigation:

Besides the monitoring usage pattern, the another common analytical use case for event data  is investigation (aka ‘needle in a haystack’) particularly in security and observability settings. In short, investigations lookback weeks/months or longer to find a specific entity of interest that may have been uncovered from a monitoring dashboard. For example, a typical investigation query is something like ‘Show me all traffic in/out of IP address xxx.xxx.xxx.xxx starting for this specific week from 6 months ago’

The challenge with investigations is they are infrequent but still time-sensitive. It is challenging in Elastic in particular (as well as other systems that do no strictly separate compute from storage) because you need to restore archived index or data snapshots from cheaper storage in order to run queries on data from e.g. a year ago.

The advantage with Sneller for such use cases is that you can ‘burst’ capacity temporarily (e.g. for a day or two) to meet the SLA needs of an investigation without having to size the entire infrastructure up-front for a large retention period.

We have a different benchmark for this use case, which we will cover in a separate post.

Benchmark Workload summary and Architecture

Here’s a quick summary of the benchmark workload itself. Some key points


1. We ran the benchmark for a full 24 hours, generating upwards of 1TB of data and 1B JSON events The events were a random mix of AWS Cloudtrail events, with each event weighing in at ~1KiB on average (hence the 1TB/day)
2. We restricted ourselves to a single-node benchmark to evaluate price+performance at the baseline. This would not be recommended in practice for Elastic, where production deployments require redundancy and recommended autoscaling based on disk free%.
3. We didn’t need to do ETL on the JSON data because both Elastic and Sneller can handle semi-structured data without first having to transform it into a structured representation. Sneller works directly against a compressed binary form of the JSON data.

Input Data
Benchmark Run
  • Run length: 24h
  • Ingest rate: ~12000+ events/sec
  • Total ingested events: ~1.036B events over 24h
  • Total (raw) data size:  ~1TiB
Event Object
  • Raw data size: 1 KiB
  • Encoding: JSON
  • Compression: gzip
  • Prototype: AWS Cloudtrail events
Event Source Pipeline
  • Custom golang+jsonnet template-based JSON data generator
  • vector (vector.dev) for event data routing to AWS S3 and elastic simultaneously

Query Workload
  • Simulated usage pattern: Monitoring Dashboard (e.g using Kibana/Grafana)
  • Number of queries: 9 queries x 7 lookback windows - 5min, 15min, 30min, 1h, 3h, 6h, 12h (note - these window sizes are borrowed from Kibana/Grafana)
  • Refresh interval: Depending on lookback window size, between 30s (for 5 min lookback) and 30 mins (for the 12h lookback window)
  • Query syntax: Elastic DSL (e.g. as generated by Kibana/Grafana)
  • Total queries over 24h period: >100,000 across all lookback windows

Infrastructure details
Elastic
  • Environment: Elastic Cloud on AWS
  • Instance specs: CPU-Optimized (ARM)
  • Instance type: c6gd.8xlarge
  • 1.9TiB NvME (min capacity for >1TiB of data)
  • 32vCPU, 64GiB memory
  • Autoscaling: Off
  • Availability zones (AZ): 1

Note: We chose this CPU-optimized since it has the the best possible performance per Elastic’s own recommendations. We also turned off autoscaling and used a single AZ, so that we did not disadvantage Elastic on costs - for production installations, Elastic strongly recommends autoscaling and more than 1 AZ.


Sneller
  • 1 x r6i.8xlarge
  • 16 Intel vCPUs with AVX512 128GiB DRAM, Upto 12.5Gbps network
  • Autoscaling: Off
  • Availability zones: 1

Note: Sneller does not require multiple AZs for HA, since all persistent state (data + config) is in S3.

Here’s the benchmark architecture. A few salient points


  1. We built a custom data generator and used vector.dev to deliver the events to both Elastic and Sneller simultaneously
  2. We use a custom golang-based script that ran each dashboard refresh cycle simultaneously against both Elastic and Sneller and reported timings/errors and infra metrics
  1. We measured both wall-clock (per-dashboard) and per-query times for each cycle to compute statistics
  2. We also captured system metrics (CPU utilization, network traffic for Sneller and disk free % for Elastic). This allowed us to track system behavior as the 24h benchmark progressed and also identified an edge-case issues in Sneller that we corrected along the way.
  1. The benchmark results were themselves captured as event data. We pushed these events as their own stream into Sneller itself, via the AWS Kinesis firehose, and used Streamlit to build a custom UI that allowed us to track the progress of a benchmark.

Takeaways


Some key takeaways from this benchmarking exercise:

Event data is always on, and standard industry benchmarks like TPC-H/DS and Clickbench are less suited to this workload.

  1. The query workload is concurrent with the ingest workload, which the TPC-H/DS benchmark suites do not consider
  2. Event data workloads show 2 distinct use case patterns that need different benchmarks:
    Monitoring - This requires concurrent querying and ingestion and models the use case of multiple users using dashboards to analyze incoming data (e.g. for Observability/Security). Often, monitoring dashboards do not span more than a few days of data.
    Investigation - This requires point lookups of specific entities over a much longer retention window, and are termed ‘Needle-in-haystack’ for this reason.

Ready to speedup and simplify your event data analytics?

Sneller is also available as an open source project on GitHub.