HTTP Analytics for 6M requests per second using ClickHouse (original) (raw)

2018-03-06

15 min read

One of our large scale data infrastructure challenges here at Cloudflare is around providing HTTP traffic analytics to our customers. HTTP Analytics is available to all our customers via two options:

In this blog post I'm going to talk about the exciting evolution of the Cloudflare analytics pipeline over the last year. I'll start with a description of the old pipeline and the challenges that we experienced with it. Then, I'll describe how we leveraged ClickHouse to form the basis of a new and improved pipeline. In the process, I'll share details about how we went about schema design and performance tuning for ClickHouse. Finally, I'll look forward to what the Data team is thinking of providing in the future.

Let's start with the old data pipeline.

Old data pipeline

The previous pipeline was built in 2014. It has been mentioned previously in Scaling out PostgreSQL for CloudFlare Analytics using CitusDB and More data, more data blog posts from the Data team.

Old-system-architecture

It had following components:

Cloudflare has grown tremendously since this pipeline was originally designed in 2014. It started off processing under 1M requests per second and grew to current levels of 6M requests per second. The pipeline had served us and our customers well over the years, but began to split at the seams. Any system should be re-engineered after some time, when requirements change.

Some specific disadvantages of the original pipeline were:

Over time, as our request volume grew, the challenges of operating this pipeline became more apparent, and we realized that this system was being pushed to its limits. This realization inspired us to think about which components would be ideal candidates for replacement, and led us to build new data pipeline.

Our first design of an improved analytics pipeline centred around the use of the Apache Flink stream processing system. We had previously used Flink for other data pipelines, so it was a natural choice for us. However, these pipelines had been at a much lower rate than the 6M requests per second we needed to process for HTTP Analytics, and we struggled to get Flink to scale to this volume - it just couldn't keep up with ingestion rate per partition on all 6M HTTP requests per second.

Our colleagues on our DNS team had already built and productionized DNS analytics pipeline atop ClickHouse. They wrote about it in "How Cloudflare analyzes 1M DNS queries per second" blog post. So, we decided to take a deeper look at ClickHouse.

ClickHouse

"ClickHouse не тормозит." Translation from Russian: ClickHouse doesn't have brakes (or isn't slow) © ClickHouse core developers

When exploring additional candidates for replacing some of the key infrastructure of our old pipeline, we realized that using a column oriented database might be well suited to our analytics workloads. We wanted to identify a column oriented database that was horizontally scalable and fault tolerant to help us deliver good uptime guarantees, and extremely performant and space efficient such that it could handle our scale. We quickly realized that ClickHouse could satisfy these criteria, and then some.

ClickHouse is an open source column-oriented database management system capable of real time generation of analytical data reports using SQL queries. It is blazing fast, linearly scalable, hardware efficient, fault tolerant, feature rich, highly reliable, simple and handy. ClickHouse core developers provide great help on solving issues, merging and maintaining our PRs into ClickHouse. For example, engineers from Cloudflare have contributed a whole bunch of code back upstream:

ClickHouse-uniq-functions

Along with filing many bug reports, we also report about every issue we face in our cluster, which we hope will help to improve ClickHouse in future.

Even though DNS analytics on ClickHouse had been a great success, we were still skeptical that we would be able to scale ClickHouse to the needs of the HTTP pipeline:

After unsuccessful attempts with Flink, we were skeptical of ClickHouse being able to keep up with the high ingestion rate. Luckily, early prototype showed promising performance and we decided to proceed with old pipeline replacement. The first step in replacing the old pipeline was to design a schema for the new ClickHouse tables.

ClickHouse schema design

Once we identified ClickHouse as a potential candidate, we began exploring how we could port our existing Postgres/Citus schemas to make them compatible with ClickHouse.

For our Zone Analytics API we need to produce many different aggregations for each zone (domain) and time period (minutely / hourly / daily / monthly). For deeper dive about specifics of aggregates please follow Zone Analytics API documentation or this handy spreadsheet.

These aggregations should be available for any time range for the last 365 days. While ClickHouse is a really great tool to work with non-aggregated data, with our volume of 6M requests per second we just cannot afford yet to store non-aggregated data for that long.

To give you an idea of how much data is that, here is some "napkin-math" capacity planning. I'm going to use an average insertion rate of 6M requests per second and $100 as a cost estimate of 1 TiB to calculate storage cost for 1 year in different message formats:

Metric Cap'n Proto Cap'n Proto (zstd) ClickHouse
Avg message/record size 1630 B 360 B 36.74 B
Storage requirements for 1 year 273.93 PiB 60.5 PiB 18.52 PiB (RF x3)
Storage cost for 1 year 28M∣28M 28M6.2M $1.9M

And that is if we assume that requests per second will stay the same, but in fact it's growing fast all the time.

Even though storage requirements are quite scary, we're still considering to store raw (non-aggregated) requests logs in ClickHouse for 1 month+. See "Future of Data APIs" section below.

Non-aggregated requests table

We store over 100+ columns, collecting lots of different kinds of metrics about each request passed through Cloudflare. Some of these columns are also available in our Enterprise Log Share product, however ClickHouse non-aggregated requests table has more fields.

With so many columns to store and huge storage requirements we've decided to proceed with the aggregated-data approach, which worked well for us before in old pipeline and which will provide us with backward compatibility.

Aggregations schema design #1

According to the API documentation, we need to provide lots of different requests breakdowns and to satisfy these requirements we decided to test the following approach:

  1. Create Cickhouse materialized views with ReplicatedAggregatingMergeTree engine pointing to non-aggregated requests table and containing minutely aggregates data for each of the breakdowns:
    • Requests totals - containing numbers like total requests, bytes, threats, uniques, etc.
    • Requests by colo - containing requests, bytes, etc. breakdown by edgeColoId - each of 120+ Cloudflare datacenters
    • Requests by http status - containing breakdown by HTTP status code, e.g. 200, 404, 500, etc.
    • Requests by content type - containing breakdown by response content type, e.g. HTML, JS, CSS, etc.
    • Requests by country - containing breakdown by client country (based on IP)
    • Requests by threat type - containing breakdown by threat type
    • Requests by browser - containing breakdown by browser family, extracted from user agent
    • Requests by ip class - containing breakdown by client IP class
  2. Write the code gathering data from all 8 materialized views, using two approaches:
    • Querying all 8 materialized views at once using JOIN
    • Querying each one of 8 materialized views separately in parallel
  3. Run performance testing benchmark against common Zone Analytics API queries

Schema-design--1-1

Schema design #1 didn't work out well. ClickHouse JOIN syntax forces to write monstrous query over 300 lines of SQL, repeating the selected columns many times because you can do only pairwise joins in ClickHouse.

As for querying each of materialized views separately in parallel, benchmark showed prominent, but moderate results - query throughput would be a little bit better than using our Citus based old pipeline.

Aggregations schema design #2

In our second iteration of the schema design, we strove to keep a similar structure to our existing Citus tables. To do this, we experimented with the SummingMergeTree engine, which is described in detail by the excellent ClickHouse documentation:

In addition, a table can have nested data structures that are processed in a special way. If the name of a nested table ends in 'Map' and it contains at least two columns that meet the following criteria... then this nested table is interpreted as a mapping of key => (values...), and when merging its rows, the elements of two data sets are merged by 'key' with a summation of the corresponding (values...).

We were pleased to find this feature, because the SummingMergeTree engine allowed us to significantly reduce the number of tables required as compared to our initial approach. At the same time, it allowed us to match the structure of our existing Citus tables. The reason was that the ClickHouse Nested structure ending in 'Map' was similar to the Postgres hstore data type, which we used extensively in the old pipeline.

However, there were two existing issues with ClickHouse maps:

To resolve problem #1, we had to create a new aggregation function sumMap. Luckily, ClickHouse source code is of excellent quality and its core developers are very helpful with reviewing and merging requested changes.

As for problem #2, we had to put uniques into separate materialized view, which uses the ReplicatedAggregatingMergeTree Engine and supports merge of AggregateFunction states for records with the same primary keys. We're considering adding the same functionality into SummingMergeTree, so it will simplify our schema even more.

We also created a separate materialized view for the Colo endpoint because it has much lower usage (5% for Colo endpoint queries, 95% for Zone dashboard queries), so its more dispersed primary key will not affect performance of Zone dashboard queries.

Schema-design--2

Once schema design was acceptable, we proceeded to performance testing.

ClickHouse performance tuning

We explored a number of avenues for performance improvement in ClickHouse. These included tuning index granularity, and improving the merge performance of the SummingMergeTree engine.

By default ClickHouse recommends to use 8192 index granularity. There is nice article explaining ClickHouse primary keys and index granularity in depth.

While default index granularity might be excellent choice for most of use cases, in our case we decided to choose the following index granularities:

Not relevant to performance, but we also disabled the min_execution_speed setting, so queries scanning just a few rows won't return exception because of "slow speed" of scanning rows per second.

On the aggregation/merge side, we've made some ClickHouse optimizations as well, like increasing SummingMergeTree maps merge speed by x7 times, which we contributed back into ClickHouse for everyone's benefit.

Once we had completed the performance tuning for ClickHouse, we could bring it all together into a new data pipeline. Next, we describe the architecture for our new, ClickHouse-based data pipeline.

New data pipeline

The new pipeline architecture re-uses some of the components from old pipeline, however it replaces its most weak components.

New-system-architecture

New components include:

As you can see the architecture of new pipeline is much simpler and fault-tolerant. It provides Analytics for all our 7M+ customers' domains totalling more than 2.5 billion monthly unique visitors and over 1.5 trillion monthly page views.

On average we process 6M HTTP requests per second, with peaks of upto 8M requests per second.

HTTP-Logfwdr-throughput

Average log message size in Cap’n Proto format used to be ~1630B, but thanks to amazing job on Kafka compression by our Platform Operations Team, it decreased significantly. Please see "Squeezing the firehose: getting the most from Kafka compression" blog post with deeper dive into those optimisations.

Benefits of new pipeline

Recently, we've improved the throughput and latency of the new pipeline even further with better hardware. I'll provide details about this cluster below.

Our ClickHouse cluster

In total we have 36 ClickHouse nodes. The new hardware is a big upgrade for us:

Our Platform Operations team noticed that ClickHouse is not great at running heterogeneous clusters yet, so we need to gradually replace all nodes in the existing cluster with new hardware, all 36 of them. The process is fairly straightforward, it's no different than replacing a failed node. The problem is that ClickHouse doesn't throttle recovery.

Here is more information about our cluster:

In order to make the switch to the new pipeline as seamless as possible, we performed a transfer of historical data from the old pipeline. Next, I discuss the process of this data transfer.

Historical data transfer

As we have 1 year storage requirements, we had to do one-time ETL (Extract Transfer Load) from the old Citus cluster into ClickHouse.

At Cloudflare we love Go and its goroutines, so it was quite straightforward to write a simple ETL job, which:

The whole process took couple of days and over 60+ billions rows of data were transferred successfully with consistency checks. The completion of this process finally led to the shutdown of old pipeline. However, our work does not end there, and we are constantly looking to the future. In the next section, I'll share some details about what we are planning.

Future of Data APIs

Log Push

We're currently working on something called "Log Push". Log push allows you to specify a desired data endpoint and have your HTTP request logs sent there automatically at regular intervals. At the moment, it's in private beta and going to support sending logs to:

It's expected to be generally available soon, but if you are interested in this new product and you want to try it out please contact our Customer Support team.

Logs SQL API

We're also evaluating possibility of building new product called Logs SQL API. The idea is to provide customers access to their logs via flexible API which supports standard SQL syntax and JSON/CSV/TSV/XML format response.

Queries can extract:

Google BigQuery provides similar SQL API and Amazon has product callled Kinesis Data analytics with SQL API support as well.

Another option we're exploring is to provide syntax similar to DNS Analytics API with filters and dimensions.

We're excited to hear your feedback and know more about your analytics use case. It can help us a lot to build new products!

Conclusion

All this could not be possible without hard work across multiple teams! First of all thanks to other Data team engineers for their tremendous efforts to make this all happen. Platform Operations Team made significant contributions to this project, especially Ivan Babrou and Daniel Dao. Contributions from Marek Vavruša in DNS Team were also very helpful.

Finally, Data team at Cloudflare is a small team, so if you're interested in building and operating distributed services, you stand to have some great problems to work on. Check out the Distributed Systems Engineer - Data and Data Infrastructure Engineer roles in London, UK and San Francisco, US, and let us know what you think.

Cloudflare's connectivity cloud protects entire corporate networks, helps customers build Internet-scale applications efficiently, accelerates any website or Internet application, wards off DDoS attacks, keeps hackers at bay, and can help you on your journey to Zero Trust.

Visit 1.1.1.1 from any device to get started with our free app that makes your Internet faster and safer.

To learn more about our mission to help build a better Internet, start here. If you're looking for a new career direction, check out our open positions.

AnalyticsDataSpeed & ReliabilityKafkaCap'n ProtoAPIphpLoad BalancingNGINX