Detect anomalies on one million unique entities with Amazon OpenSearch Service | Amazon Web Services (original) (raw)

Amazon OpenSearch Service supports a highly performant, integrated anomaly detection engine that enables the real-time identification of anomalies in streaming data. Last year, we released high-cardinality anomaly detection (HCAD) to detect individual entities’ anomalies. With the 1.1 release, we have allowed you to monitor a million entities with steady, predictable performance. HCAD is easiest when described in contrast to the non-HCAD single-stream solution. In a single-stream detector, we detect anomalies for an aggregate entity. For example, we can use a single-stream detector to sift through aggregated traffic across all IP addresses so that users can be notified when unusual spikes occur. However, we often need to identify anomalies in entities, such as individual hosts and IP addresses. Each entity may work on a different baseline, which means its time series’ distribution (measured in parameters such as magnitude, trend, and seasonality, to name a few) are different. The different baselines make it inaccurate to detect anomalies using a single monolithic model. HCAD distinguishes itself from single-stream detectors by customizing anomaly detection models to entities.

Example use cases of HCAD include the following:

Making HCAD real time and performant while achieving completeness and scalability is a formidable challenge:

The first release of HCAD in Amazon OpenSearch Service traded completeness and scalability for performance: the anomaly detector limited the number of entities to 1,000. You can change the setting plugins.anomaly_detection.max_entities_per_query to increase the number of monitored entities per interval. However, such a change incurs a non-negligible cost, which opens the door to cluster instability. Each entity uses memory to host models, disk I/O to read and write model checkpoints and anomaly results, CPU cycles for metadata maintenance and model training and inference, and garbage collection for deleted models and metadata. The more entities, the more resource usage. Furthermore, HCAD could suffer a combinatorial explosion of entities when supporting multiple categorical fields (a feature released in Amazon OpenSearch Service 1.1). Imagine a detector with only one categorical field geolocation. Geolocation has 1,000 possible values. Adding another categorical field product with 1,000 allowed values gives the detector 1 million entities.

For the next version of HCAD, we devoted much effort to improving completeness and scalability. Our approach captures sizing a cluster right and combines in-memory model hosting and on-disk model loading. Performance metrics show HCAD doesn’t saturate the cluster with substantial cost and still leaves plenty of room for other tasks. As a result, HCAD can analyze one million entities in 10 minutes and flags anomalies in different patterns. In this post, we will explore how HCAD can analyze one million entities and the technical implementations behind the improvements.

How to size domains

Model management is a trade-off: disk-based solutions that reload-use-stop-store models on every interval offer savings in memory but suffer high overhead and are hard to scale. Memory-based solutions offer lower overhead and higher throughput but typically increase memory requirements. We exploit the trade-off by implementing an adaptive mechanism that hosts models in memory as much as allowed (capped via the cluster setting plugins.anomaly_detection.model_max_size_percent), as required by best performance. When models don’t fit in memory, we process extra model requests by loading models from disks.

The use of memory whenever possible is responsible for the HCAD scalability. Therefore, it is crucial to sizing a cluster right to offer enough memory for HCAD. The main factors to consider when sizing a cluster are:

PUT /_cluster/settings
{
    "persistent": {
        "plugins.anomaly_detection.model_max_size_percent": "0.5"
    }
}

Enter the following profile request:

GET /_plugins/_anomaly_detection/detectors/fkzfBX0BHok1ZbMqLMdu/_profile/models

We get the following response:

{
    ...{
        "model_id": "fkzfBX0BHok1ZbMqLMdu_entity_GOIubzeHCXV-k6y_AA4K3Q",
        "entity": [{
                "name": "host",
                "value": "host141"
            },
            {
                "name": "process",
                "value": "process54"
            }
        ],
        "model_size_in_bytes": 470491,
        "node_id": "OcxBDJKYRYKwCLDtWUKItQ"
    }
    ...
}

In order to derive the required number of data nodes D, we must first derive an expression for the number of entity models N that a node can host in memory. We define Si to be the entity model size of detector i. If we use an instance type with heap size H where the maximum AD memory percentage is P, N is equal to AD memory allowance divided by the maximum entity model size among all detectors:

We consider the required number of data nodes D as a function of N. Let’s denote by Ci the total entity counts of detector i. Given n detectors, it follows that:

The fact that AD needs an extra 20% memory overhead is expressed by multiplying 1.2 in the formula. The ceil function represents the smallest integer greater than or equal to the argument.

For example, an r5.2xlarge Amazon Elastic Compute Cloud (Amazon EC2) instance has 64 GB RAM, so the heap size is 32 GB. We configure AD to use at most half of the allowed heap size. We have two HCAD detectors, whose model sizes are 471 KB and 403 KB, respectively. To host 500,000 entities for each detector, we need a 36-data-node cluster according to the following calculation:


We also need to ensure there is enough disk space. In the end, we used a 39-node r5.2xlarge cluster (3 primary and 36 data nodes) with 4 TB Amazon Elastic Block Store (EBS) storage on each node.

What if a detector’s entity count is unknown?

Sometimes, it’s hard to know a detector’s entity count. We can check historical data and estimate the cardinality. But it’s impossible to predict the future accurately. A general guideline is to allocate buffer memory during planning. Appropriately used, buffer memory provides room for small changes. If the changes are significant, you can adjust the number of data nodes because HCAD can scale in and out horizontally.

What if the number of active entities is changing?

The total number of entities created can be higher than the number of active entities, as evident from the following two figures. The total number of entities in the HTTP logs dataset is 2 million within 2 months, but each entity only appears seven times on average. The number of active entities within a time-boxed interval is much less than 2 million. The following figure presents an example time series of network size of IP addresses from the HTTP logs dataset.

http log data distribution

The KPI dataset also shows similar behavior, where entities often appear in a short amount of time during bursts of entity activities.

kpi data distribution

AD requires large sample sizes to create a comprehensive picture of the data patterns, making it suitable for dense time series that can be uniformly sampled. AD can still train models and produce predictions if the preceding bursty behavior can last a while and provide at least 400 points. However, training becomes more difficult, and prediction accuracy is lower as data gets more sparse.

It’s wasteful to preallocate memory according to the total number of entities in this case. Instead of the total number of entities, we need to consider the maximal active entities within an interval. You can get an approximate number by using a date_histogram and cardinality aggregation pipeline, and sorting during a representative period. You can run the following query if you’re indexing host-cloudwatch and want to find out the maximal number of active hosts within a 10-minute interval throughout 10 days:

GET /host-cloudwatch/_search?size=0
{
    "query": {
        "range": {
            "@timestamp": {
                "gte": "2021-11-17T22:21:48",
                "lte": "2021-11-27T22:22:48"
            }
        }
    },
    "aggs": {
        "by_10m": {
            "date_histogram": {
                "field": "@timestamp",
                "fixed_interval": "10m"
            },
            "aggs": {
                "dimension": {
                    "cardinality": {
                        "field": "host"
                    }
                },
                "multi_buckets_sort": {
                    "bucket_sort": {
                        "sort": [{
                            "dimension": {
                                "order": "desc"
                            }
                        }],
                        "size": 1
                    }
                }
            }
        }
    }
}

The query result shows that at most about 1,000 hosts are active during a ten-minute interval:

{
    ...
    "aggregations": {
        "by_10m": {
            "buckets": [{
                "key_as_string": "2021-11-17T22:30:00.000Z",
                "key": 1637188200000,
                "doc_count": 1000000,
                "dimension": {
                    "value": 1000
                }
            }]
        }
    }
    ...
}

HCAD has a cache to store models and maintain a timestamp of last access for each model. For each model, an hourly job checks the time of inactivity and invalidates the model if the time of inactivity is longer than 1 hour. Depending on the timing of the hourly check and the cache capacity, the elapsed time a model is cached varies. If the cache capacity isn’t large enough to hold all non-expired models, we have an adapted least frequently used (LFU) cache policy to evict models (more on this in a later section), and the cache time of those invalidated models is less than 1 hour. If the last access time of a model is reset immediately after the hourly check, when the next hourly check happens, the model doesn’t expire. The model can take another hour to expire when the next hourly check comes. So the max cache time is 2 hours.

The upper bound of active entities that detector i can observe is:


This equation has the following parameters:

Accordingly, we should account for Bi in the sizing formula:

Sizing calculation flow chart

With the definitions of calculating the number of data nodes in place, we can use the following flow chart to make decisions under different scenarios.

sizing flowchart

What if the cluster is underscaled?

If the cluster is underscaled, AD prioritizes more frequent and recent entities. AD makes its best effort to accommodate extra entities by loading their models on demand from disk without hosting them in the in-memory cache. Loading the models on demand means reloading-using-stopping-storing models at every interval, whose overheads are quite high. The overheads mostly have to do with network or disk I/O, rather than with the cost of model inferencing. Therefore, we did it in a steady, controlled manner. If the system resource usage isn’t heavy and there is enough time, HCAD may finish processing the extra entities. Otherwise, HCAD doesn’t necessarily find all the anomalies it could otherwise find.

Example: Analysis of 1 million entities

In the following example, you will learn how to set up a detector to analyze one million entities.

Ingest data

We generated 10 billion documents for 1 million entities in our evaluation of scalability and completeness improvement. Each entity has a cosine wave time series with randomly injected anomalies. With help from the tips in this post, we created the index host-cloudwatch and ingested the documents into the cluster. host-cloudwatch records elapsed CPU and JVM garbage collection (GC) time by a process within a host. Index mapping is as follows:

{
    ...
    "mappings": {
        "properties": {
            "@timestamp": {
                "type": "date"
            },
            "cpuTime": {
                "type": "double"
            },
            "jvmGcTime": {
                "type": "double"
            },
            "host": {
                "type": "keyword"
            },
            "process": {
                "type": "keyword"
            }
        }
    }
    ...
}

Create a detector

Consider the following factors before you create a detector:

Our detector’s configuration aggregates average garbage collection processing time every 10 minutes and analyzes the average at the granularity of processes on different hosts. The API request to create such a detector is as follows. You can also use our streamlined UI to create and start a detector.

POST _plugins/_anomaly_detection/detectors
{
    "name": "detect_gc_time",
    "description": "detect gc processing time anomaly",
    "time_field": "@timestamp",
    "indices": [
        "host-cloudwatch"
    ],
    "category_field": ["host", "process"],
    "feature_attributes": [{
        "feature_name": "jvmGcTime average",
        "feature_enabled": true,
        "importance": 1,
        "aggregation_query": {
            "gc_time_average": {
                "avg": {
                    "field": "jvmGcTime"
                }
            }
        }
    }],
    "detection_interval": {
        "period": {
            "interval": 10,
            "unit": "MINUTES"
        }
    },
    "schema_version": 2
}

After the initial training is complete, all models of the 1 million entities are up in the memory, and 1 million results are generated every detector interval after a few hours. To verify the number of active models in the cache, you can run the profile API:

GET /_plugins/_anomaly_detection/detectors/fkzfBX0BHok1ZbMqLMdu/_profile/models

We get the following response:

{
    ...
    "model_count": 1000000
}

You can observe how many results are generated every detector interval (in our case 10 minutes) by invoking the result search API:

GET /_plugins/_anomaly_detection/detectors/results/_search
{
    "query": {
        "range": {
            "execution_start_time": {
                "gte": 1636501467000,
                "lte": 1636502067000
            }
        }
    },
    "track_total_hits": true
}

We get the following response:

{
    ...
    "hits": {
        "total": {
            "value": 1000000,
            "relation": "eq"
        },
        ...
    }
    ...
}

The OpenSearch Dashboards gives an exposition of the top entities producing the most severe or most number of anomalies.

anomaly overview

You can choose a colored cell to review the details of anomalies occurring within that given period.

press anomaly

You can view anomaly grade, confidence, and the corresponding features in a shaded area.

feature graph

Create a monitor

You can create an alerting monitor to notify you of anomalies based on the defined anomaly detector, as shown in the following screenshot.

create monitor

We use anomaly grade and confidence to define a trigger. Both anomaly grade and confidence are values between 0 and 1.

Anomaly grade represents the severity of an anomaly. The closer the grade is to 1, the higher the severity. 0 grade means the corresponding prediction isn’t an anomaly.

Confidence measures whether an entity’s model has observed enough data such that the model contains enough unique, real-world data points. If a confidence value from one model is larger than the confidence of a different model, then the anomaly from the first model has observed more data.

Because we want to receive high fidelity alerts, we configured the grade threshold to be 0 and the confidence threshold to be 0.99.

edit trigger

The final step of creating a monitor is to add an action on what to include in the notification. Our example detector finds anomalies at a particular process in a host. The notification message should contain the entity identity. In this example, we use ctx.results.0.hits.hits.0._source.entity to grab the entity identity.

edit action

A monitor based on a detector extracts the maximum grade anomaly and triggers an alert based on the configured grade and confidence threshold. The following is an example alert message:

Attention

Monitor detect_cpu_gc_time2-Monitor just entered alert status. Please investigate the issue.
- Trigger: detect_cpu_gc_time2-trigger
- Severity: 1
- Period start: 2021-12-08T01:01:15.919Z
- Period end: 2021-12-08T01:21:15.919Z
- Entity: {0={name=host, value=host107}, 1={name=process, value=process622}}

You can customize the extraction query and trigger condition by changing the monitor defining method to Extraction query monitor and modifying the corresponding query and condition. Here is the explanation of all anomaly result index fields you can query.

edit monitor

Evaluation

In this section, we evaluate HCAD’s precision, recall, and overall performance.

Precision and recall

We evaluated precision and recall over the cosine wave data, as mentioned earlier. Such evaluations aren’t easy in the context of real-time processing because only one point is available per entity during each detector interval (10 minutes in the example). Processing all the points takes a long time. Instead, we simulated real-time processing by fast-forwarding the processing in a script. The results are an average of 100 runs. The standard deviation is around 0.12.

The overall average precision, including the effects of cold start using linear interpolation, for the synthetic data is 0.57. The recall is 0.61. We note that no transformations were applied; it’s possible and likely that transformations improve these numbers. The precision is 0.09, and recall is 0.34 for the first 300 points due to interpolated cold start data for training. The numbers pick up as the model observes more real data. After another 5,000 real data points, the precision and recall improve to 0.57 and 0.63, respectively. We reiterate that the exact numbers vary based on the data characteristics—a different benchmark or detection configuration would have other numbers. Further, if there is no missing data, the fidelity of the HCAD model would be the same as that of a single-stream detector.

Performance

We ran HCAD on an idle cluster without ingestion or search traffic. Metrics such as JVM memory pressure and CPU of each node are well within the safe zone, as shown in the following screenshot. JVM memory pressure varies between 23–39%. CPU is mostly around 1%, with hourly spikes up to 65%. An internal hourly maintenance job can account for the spike due to saving hundreds of thousands of model checkpoints, clearing unused models, and performing bookkeeping for internal states. However, this can be a future improvement.

jvm memory pressure

cpu

Implementation

We next discuss the specifics of the technical work that is germane to HCAD’s completeness and scalability.

RCF 2.0

In Amazon OpenSearch Service 1.1, we integrated with Random Cut Forest library (RCF) 2.0. RCF is based on partitioning data into different bounding boxes. The previous RCF version maintains bounding boxes in memory. However, a real-time detector only uses the bounding boxes when processing a new data point and leaves them dormant most of the time. RCF 2.0 allows for recreating those bounding boxes when required so that bounding boxes are present in memory when processing the corresponding input. The on-demand recreation has led to nine times memory overhead reduction and therefore can support hosting nine times as many models in a node. In addition, RCF 2.0 revamps the serialization module. The new module serializes and deserializes a model 26 times faster using 20 times smaller disk space.

Pagination

Regarding feature aggregation, we switched from getting top hits using terms aggregation to pagination via composite aggregation. We evaluated multiple pagination implementations using a generated dataset with 1 million entities. Each entity has two documents. The experiment configurations can vary according to the number of data nodes, primary shards, and categorical fields. We believe composite queries are the right choice because even though they may not be the fastest in all cases, they’re the most stable on average (40 seconds).

Amortize expensive operations

HCAD can face thundering herd traffic, in which many entities make requests like reading checkpoints from disks at approximately the same time. Therefore, we create various queues to buffer pent-up requests. These queues amortize expensive costs by performing a small and bounded amount of work steadily. Therefore, HCAD can offer predictable performance and availability at scale.

In-memory cache

HCAD appeals to caching to process entities whose memory requirement is larger than the configured memory size. At first, we tried a least recently used cache but experienced thrashing when running the HTTP logs workload: with 100 1-minute interval detectors and millions of entities for each detector, we saw few cache hits (many hundreds) within 7 hours. We were wasting CPU cycles swapping models in and out of memory all the time. As a general rule, a hit-to-miss ratio worse than 3:1 is not worth considering caching for quick model accesses.

Instead, we turned to a modified LFU caching, augmented to include heavy hitters’ approximation. A decayed count is maintained for each model in the cache. The decayed count for a model in the cache is incremented when the model is accessed. The model with the smallest decayed count is the least frequently used model. When the cache reaches its capacity, it invalidates and removes the least frequently used model if the new entity’s frequency is no smaller than the least frequently used entity. This connection between heavy hitter approximation and traditional LFU allows us to make the more frequent and recent models sticky in memory and phase out models with lesser cache hit probabilities.

Fault tolerance

Unrecoverable memory state is limited, and enough information of models is stored on disk for crash resilience. Models are recovered on a different host after a crash is detected.

High performance

HCAD builds on asynchronous I/O: all I/O requests such as network calls or disk accesses are non-blocking. In addition, model distribution is balanced across the cluster using a consistent hash ring.

Summary

We enhanced HCAD to improve its scalability and completeness without altering the fidelity of the computation. As a result of these improvements, I showed you how to size an OpenSearch domain and use HCAD to monitor 1 million entities in 10 minutes. To learn more about HCAD, see anomaly detection documentation.

If you have feedback about this post, submit comments in the comments section below. If you have questions about this post, start a new thread on the Machine Learning forum.


About the Author

bio

Kaituo Li is an engineer in Amazon OpenSearch Service. He has worked on distributed systems, applied machine learning, monitoring, and database storage in Amazon. Before Amazon, Kaituo was a PhD student in Computer Science at University of Massachusetts, Amherst. He likes reading and sports.