Skip to main content

3 posts tagged with "blog post"

View All Tags

Segment Compaction for Upsert Enabled Tables in Apache Pinot

· 4 min read
Robert Zych
Software Engineer

I’m happy to share that my 1st feature contribution to the Apache Pinot project (Segment compaction for upsert enabled real-time tables) was merged recently! In this post, I will briefly discuss the problem segment compaction addresses, how to configure it, and what it looks like in action. If you’re unfamiliar with Pinot’s Upsert features, I recommend reviewing Full Upserts in Pinot to get started and Stream Ingestion with Upsert for more information.

Context and Configuration#

As Pinot’s Upsert stores all versions of the record ingested into immutable segments on disk, older records unnecessarily consume valuable storage space when they’re no longer used in query results. Pinot’s Segment Compaction reclaims this valuable storage space by introducing a periodic process that replaces the completed segments with compacted segments which only contain the latest version of the records. I recommend reviewing the Minion documentation if you’re unfamiliar with Pinot’s ability to run periodic processes.

With task scheduling enabled and an available Minion, you can configure segment compaction by adding the following to your table’s config.

"task": {  "taskTypeConfigsMap": {    "UpsertCompactionTask": {      "schedule": "0 */5 * ? * *",      "bufferTimePeriod": "7d",      "invalidRecordsThresholdPercent": "30",      "invalidRecordsThresholdCount": "100000"    }  }}

All the configs above (excluding schedule) determine which completed segments are selected for compaction.

bufferTimePeriod is the amount of time that has elapsed since the segment was consuming. In the example above, this has been set to “7d” which means that any segment that was completed over 7 days ago may be eligible for compaction. However, if you want to ensure that segments are compacted without any additional delay this config can be set to “0d”.

invalidRecordsThresholdPercent is a limit to the amount of older records allowed in the completed segment represented as a percentage of the total number of records in the segment (i.e. old records / total records). In the example above, this has been set to “30” which means that if more than 30% of the records in the completed segment are old, then the segment may be selected for compaction. As segment compaction is an expensive operation, it is not recommended to set this config (or invalidRecordsThresholdCount) too close to 1. This config is optional on the condition that invalidRecordsThresholdCount has been set and can be used in conjunction with invalidRecordsThresholdCount.

invalidRecordsThresholdCount is also a limit similar to invalidRecordsThresholdPercent, but allows you to express the threshold as a record count. In the example above, this has been set to “100000” which means that if the segment contains more than 100K records then it may be selected for compaction.

Example Use Case#

I’ve created a data set that includes 24M records. The data set contains 240K unique keys that have each been duplicated 100 times.

alt

After ingesting the data set there are 6 segments (5 completed segments + 1 consuming segment) with a total estimated size of 22.8MB. Submitting the query “set skipUpsert=true; select count(*) from transcript_upsert” before compaction produces the following query result.

alt

After the compaction tasks are complete, the Minion Task Manager UI reports the following.

alt

Segment compaction generates a task for each segment to be compacted. 5 tasks were generated in this case because 90% of the records (3.6–4.5M records) are old in all 5 of the completed segments, therefore exceeding the configured thresholds. If a completed segment only contains old records, it is deleted immediately and a task isn’t generated to compact it.

alt

Submitting the query again we now see that count matches the set of 240K unique keys.

alt

Once compaction has completed and the original segments have been replaced with their compacted counterparts we see that the total number of segments remained the same, but the total estimated size dropped to only 2.77MB! Since compaction can yield very small segments, one improvement would be to merge smaller segments into larger ones as this would improve query latency.

Conclusion#

In this brief overview of Segment Compaction I covered the problem it addresses, how you can configure it, and demonstrated its ability to reclaim storage space. I’d like to thank Ankit Sultana, Seunghyun Lee, and especially Jackie Jiang for their feedback and support throughout the design and development stages. If you have any questions or feedback, I’m available on the Apache Pinot Slack.

Star-Tree Index in Apache Pinot - Part 3 - Understanding the Impact in Real Customer Scenarios

· 8 min read

In part 1 of this blog series, we looked at how a star-tree index brought down standalone query latency on a sizable dataset of ~633M records from 1,513ms to 4ms! — nearly 380x faster. 

In part 2 of this blog series, we imitated a real production scenario by firing hundreds of concurrent queries using JMeter and showcased how using a star-tree index helped achieve a >95% drop in p90th / p95th / p99th latencies and 126x increase in Throughput.

In this part, we will cover some real customer stories that have seen 95% to 99% improvement in query performance using Star-Tree Index.

AdTech Use Case#

This was for a leading AdTech platform and a somewhat typical use case; users of the platform (advertisers, publishers, and influencers) wanted to see fresh metrics on how their activities (such as online content, ad, and email campaigns) were performing in real-time so they could tweak things as needed. The application team wanted to provide a rich analytical interface to these users so that not only can they see the current performance but also do custom slicing and dicing of data over a period of time. For example, compare their current campaign performance to one they ran two weeks back, do cohort analysis, and so on.

Why was the existing system not working?#

Their existing tech stack was a mix of OSS and custom-built in-house code, which was both operationally difficult to manage and costly to maintain. Yet more importantly, it wasn’t able to meet the basic throughput and latency requirements required by the platform to sustain user growth as well as provide richer analytic capabilities in the product.

The Problem and Challenges?#

When the StarTree Sales Engineering team was engaged, the requirements were very clear:

  • Throughput: Support 50+ QPS during POC and 200+ for production)
  • Latency: P95th latency of 2s, including query that needed aggregation of ~ 2 billion rows
  • Scalability: Ability to scale efficiently with future growth in QPS in a non-linear manner

The biggest challenge was the size of data — 20+ TB and growing — and on top of that, a complex aggregation query driving the summary view for users so they can drill further in to get more details. 

This particular query needed to aggregate close to 2 Billion records at read time and then would be fired for every active user interacting with the platform (so high concurrent QPS). In this case, despite applying all relevant indexes available in their existing system, out-of-the-box query performance was still in the 6-8 seconds range, which is expected given that bulk of the work for the query is happening in the aggregation phase and not during the filtering phase (indexing helps with this).

In other OLAP systems they explored, the only option available to handle this use case was doing ingestion time rollups. In other words, changing the data to higher granularity. However, this obviously means losing access to raw data and also potentially re-bootstrapping if new use cases come down the road that need raw data access.

This is exactly the type of scenario that the Star-Tree Index, unique to Apache Pinot, is designed to address - handle large aggregation queries at scale that need sub-second performance. The best part is you can apply it anytime without any need to reprocess the data or plan any system downtime. (Segment reload to apply table config changes run as a background task in Apache Pinot.) In this specific case, the same query latencies with the star-tree index applied went down to 15 ms. This implicitly meant that with the same infrastructure footprint, StarTree was able to support ~70 QPS (Queries Per Second) vs < 1 QPS for this most complex query; while still keeping the raw data intact.

Data Size and Infra Footprint for the Pilot: #

  • Total # of records: ~2 Trillion
  • Data Size: ~20 TB
  • Capacity: 72 vCPUs across 9 Pinot servers (8 vCPU, 64GB per node). 

Impact Summary:#

  • 99.76% reduction in latency vs. no Star-Tree Index (6.3 seconds to 15 ms)
  • 99.99999% reduction in amount of data scanned/aggregated per query (>1.8B docs to <2,400)

Visualization of the impact of start-tree index for an AdTech use case with Apache Pinot

CyberSecurity Use Case:#

A cybersecurity company that provides their customers with a real-time threat detection platform with AI, allowing them to analyze network flow logs in real-time with a sophisticated reporting/analytical UI. The initial landing page inside the customer portal is a summary view of everything the platform was monitoring in the user's environment and then provides the capability to drill down into specifics of each. For example, filter requests by a specific application or IP Address.

Why was the existing system not working?#

Their existing tech stack was a mix of Athena/Presto, which couldn’t meet the throughput and latency requirements with growing data volume across their customers. Additionally, operational overhead around managing some of these systems in-house led to increased cost.

The Problem and Challenges?#

Some of the key requirements that StarTree Cloud cluster had to meet:

  • Throughput: Up to 200 QPS (200 projected by end of year)
  • Latency: <1 second P99
  • High ingestion rate: 300k events/sec
  • ROI: Provide better cost efficiencies

Similar to Use case #1, the customer wanted to retain data at the lowest granularity (so no ingestion roll-ups), and given the time column granularity similar challenge with running the complex aggregation query to power off the summary view. Additionally, the requirement to get double-digit throughput(QPS) for the POC with the most efficient compute footprint made it quite challenging.

Given the overhead while doing complex aggregations, efficient filtering (indexes) wasn’t enough - in this case, with 3 * 4-core/32GB nodes query took more than 15 seconds. We immediately switched the table config to add star-tree index to the table config and do a segment reload, and the results were phenomenal — query latency was reduced to 10ms. 

Data Size and Infra Footprint for the Pilot: #

  • Total # of records: ~8 Billion
  • Data Size: 500+ GB
  • Capacity: 12 vCPUs across 3 Pinot servers (4-core/32GB) 

Impact Summary:#

  • 99.94% reduction in query latency (achieving 100 QPS for the same query with no extra hardware)
  • 99.9998% reduction in data scanned/aggregated per query
  • Happy Customer 😃

Visualization of the impact of star-tree index for a Cybersecurity use case with Apache Pinot

Multiplayer Game Leaderboard Use Case#

A global leader in the interactive entertainment field has an A/B Testing / Experimentation use case that tracks players’ activities to measure the player engagement on the new features being rolled out.

The Problem and Challenges?#

Some of the key requirements that StarTree Cloud cluster had to meet:

  • Throughput: = 200 QPS 
  • Latencies: <1 second P99
  • Ingestion rate: 50K events/sec

Given the overhead while doing complex aggregations, efficient filtering (indexes) wasn’t enough - in this case, with 1 * 4-core/32GB nodes query took 163 milliseconds. After switching to a star-tree index, the query latency was reduced to 7ms (a reduction of 95.7%). 

Data Size and Infra Footprint for the Pilot: #

  • Total # of records: ~34 Million
  • Data Size: 500+ GB
  • Capacity: 4 vCPUs - 1 Pinot server (4-cores / 32 GB) 

Impact Summary:#

  • 95.70% improvement in query performance as a result of 99.9962% reduction in number of documents and entries scanned.  

Visualization of the impact of star-tree index for a Gaming use case with Apache Pinot

Quick Recap: Star-Tree Index Performance Improvements#

Recap Table of the Impact that star-tree index had on three real-world customers using Apache Pinot™

  • 99.99% reduction in data scanned/aggregated per query
  • 95 to 99% improvement in query performance

Disk IO is the most expensive operation in query processing. The star-tree index reduces Disk IO significantly. Instead of scanning raw documents from the disk and computing aggregates on the fly, star-tree index scans pre-aggregated documents for the combination of dimensions specified in the query from the disk. 

In part 1 of the series, we saw that the star-tree index reduced the disk reads by 99.999% from 584 Million entries (in case of an inverted index) to 2,045. Query latency came down 99.67% from 1,513 ms to 4 ms! This, in itself, was a HUGE benefit. 

In addition to the drastic improvement in query latency, the memory and CPU usage decreased significantly, freeing up resources for taking up more concurrent workloads. The cumulative effect was the 126 x increase in QPS on this small 4 vCPU Pinot Server, as we saw in part 2 blog of this series. 

And finally, in this part 3 of the blog series, we covered three real production use cases that have seen 95% to 99% improvement in query performance using Star-Tree Index.

Intrigued by What You’ve Read?#

The next step is to load your data into an open-source Apache Pinot cluster or, if you prefer, a fully-managed database-as-a-service (DBaaS). Sign up today for a StarTree Cloud account, free for 30 days. If you have more questions, sign up for the StarTree Community Slack.

GET STARTED ON STARTREE CLOUD

Real-Time Mastodon Usage with Apache Kafka, Apache Pinot, and Streamlit

· 7 min read
Mark Needham
Developer Advocate

I recently came across a fascinating blog post written by Simon Aubury that shows how to analyze user activity, server popularity, and language usage on Mastodon, a decentralized social networking platform that has become quite popular in the last six months. 

The Existing Solution: Kafka Connect, Parquet, Seaborn and DuckDB #

To start, Simon wrote a listener to collect the messages, which he then published into Apache Kafka®. He then wrote a Kafka Connect configuration that consumes messages from Kafka and flushes them after every 1,000 messages into Apache Parquet files stored in an Amazon S3 bucket. 

Finally, he queried those Parquet files using DuckDB and created some charts using the Seaborn library, as reflected in the architecture diagram below:

Flowchart of data collection to data processing

Fig: Data Collection Architecture

The awesome visualizations that Simon created make me wonder whether we can change what happens downstream of Kafka to make our queries even more real-time. Let’s find out!

Going Real-Time with Apache Pinot™#

Now Apache Pinot comes into the picture. Instead of using Kafka Connect to batch Mastodon toots into groups of 1,000 messages to generate Parquet files, we can stream the data immediately and directly, toot-by-toot into Pinot and then build a real-time dashboard using Streamlit:

Data collection in Mastodon, followed by processing in Apache Kafka, Apache Pinot, and Streamlit

Setup#

To follow along, first clone my fork of Simon’s GitHub repository:

git clone git@github.com:mneedham/mastodon-stream.gitcd mastodon-stream

Then launch all of the components using Docker Compose:

docker-compose up

Pinot Schema and Table#

Similar to what Simon did with DuckDB, we’ll ingest the Mastodon events into a table. Pinot tables have a schema that’s defined in a schema file. 

To come up with a schema file, we need to know the structure of the ingested events. For example:

{  "m_id": 110146691030544274,  "created_at": 1680705124,  "created_at_str": "2023 04 05 15:32:04",  "app": "",  "url": "https://mastodon.social/@Xingcat/110146690810165414",  "base_url": "https://techhub.social",  "language": "en",  "favourites": 0,  "username": "Xingcat",  "bot": false,  "tags": 0,  "characters": 196,  "words": 36,  "mastodon_text": "Another, “I don’t know what this is yet,” paintings. Many, many layers that look like distressed metal or some sort of rock crosscut. Liking it so far, need to figure out what it’ll wind up being."}

Mapping these fields directly to columns is easiest and will result in a schema file that looks like this:

{  "schemaName":"mastodon",  "dimensionFieldSpecs":[    {"name":"m_id","dataType":"LONG"},    {"name":"created_at_str","dataType":"STRING"},    {"name":"app","dataType":"STRING"},    {"name":"url","dataType":"STRING"},    {"name":"base_url","dataType":"STRING"},    {"name":"language","dataType":"STRING"},    {"name":"username","dataType":"STRING"},    {"name":"bot","dataType":"BOOLEAN"},        {"name":"mastodon_text","dataType":"STRING"}  ],  "metricFieldSpecs":[    {"name":"favourites","dataType":"INT"},    {"name":"words","dataType":"INT"},    {"name":"characters","dataType":"INT"},    {"name":"tags","dataType":"INT"}  ],  "dateTimeFieldSpecs":[    {      "name":"created_at",      "dataType":"LONG",      "format":"1:MILLISECONDS:EPOCH",      "granularity":"1:MILLISECONDS"    }  ]}

Next up: our table config, shown below:

{    "tableName": "mastodon",    "tableType": "REALTIME",    "segmentsConfig": {      "timeColumnName": "created_at",      "timeType": "MILLISECONDS",      "schemaName": "mastodon",      "replicasPerPartition": "1"    },    "tenants": {},    "tableIndexConfig": {      "loadMode": "MMAP",      "streamConfigs": {        "streamType": "kafka",        "stream.kafka.consumer.type": "lowLevel",        "stream.kafka.topic.name": "mastodon-topic",        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",        "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",        "stream.kafka.decoder.prop.format": "AVRO",        "stream.kafka.decoder.prop.schema.registry.rest.url": "http://schema-registry:8081",        "stream.kafka.decoder.prop.schema.registry.schema.name": "mastodon-topic-value",        "stream.kafka.broker.list": "broker:9093",        "stream.kafka.consumer.prop.auto.offset.reset": "smallest"      }    },    "metadata": {      "customConfigs": {}    },    "routing": {      "instanceSelectorType": "strictReplicaGroup"    }}

The following configs represent the most important ones for ingesting Apache Avro™ messages into Pinot:

"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder","stream.kafka.decoder.prop.format": "AVRO","stream.kafka.decoder.prop.schema.registry.rest.url": "http://schema-registry:8081","stream.kafka.decoder.prop.schema.registry.schema.name": "mastodon-topic-value",

The KafkaConfluentSchemaRegistryAvroMessageDecoder decoder calls the Schema Registry with the schema name to get back the schema that it will use to decode messages.

We can create the Pinot table by running the following command:

docker run \   --network mastodon \   -v $PWD/pinot:/config \   apachepinot/pinot:0.12.0-arm64 AddTable \     -schemaFile /config/schema.json \     -tableConfigFile /config/table.json \     -controllerHost "pinot-controller" \    -exec

We can then navigate to the table page of the Pinot UI: 

http://localhost:9000/#/tenants/table/mastodon_REALTIME

Here, we’ll see the following:

Apache Pinot table config and schema

Ingest Data into Kafka#

Now, we need to start ingesting data into Kafka. Simon created a script that accomplishes this for us, so we just need to indicate which Mastodon servers to query.

python mastodonlisten.py --baseURL https://data-folks.masto.host \  --public --enableKafka --quietpython mastodonlisten.py --baseURL https://fosstodon.org/ \  --public --enableKafka --quietpython mastodonlisten.py --baseURL https://mstdn.social/ \  --public --enableKafka --quiet

We can then check the ingestion of messages with the kcat command line tool:

kcat -C -b localhost:9092 -t mastodon-topic \  -s value=avro -r http://localhost:8081 -e

Query Pinot#

Now, let’s go to the Pinot UI to see what data we’ve got to play with:

http://localhost:9000

We’ll see the following preview of the data in the mastodon table:

SQL Editor, query response stats, and query result in Apache Pinot

We can then write a query to find the number of messages posted in the last five minutes:

select count(*) as "Num toots", count(distinct(username)) as "Num users", count(distinct(url)) as "Num urls"from mastodonwhere created_at*1000 > ago('PT1M')order by 1 DESC;

Query results for toots, users, and urls

We can also query Pinot via the Python client, which we can install by running the following:

pip install pinotdb

Once we’ve done that, let’s open the Python REPL and run the following code:

from pinotdb import connectimport pandas as pd
conn = connect(host='localhost', port=8099, path='/query/sql', scheme='http')
curs = conn.cursor()
st.header("Daily Mastodon Usage")query = """select count(*) as "Num toots", count(distinct(username)) as "Num users", count(distinct(url)) as "Num urls"from mastodonwhere created_at*1000 > ago('PT1M')order by 1 DESC;"""curs.execute(query)
df = pd.DataFrame(curs, columns=[item[0] for item in curs.description])

This produces the resulting DataFrame:

   Num toots  Num users  Num urls0        552        173       192

Streamlit#

Next, we’ll create a Streamlit dashboard to package up these queries. We’ll visualize the results using Plotly, which you can install using:

pip install streamlit plotly

I’ve created a Streamlit app in the file app.py, which you can find in the GitHub repository. Let’s have a look at the kinds of visualizations that we can generate. 

First, we’ll create metrics to show the number of toots, users, and URLs in the last n minutes. n will be configurable from the app as shown in the screenshot below:

Chart of real-time Mastodon usage

From the screenshot, we can identify mastodon.cloud as the most active server, though it produces only 1,800 messages in 10 minutes or three messages per second. The values in green indicate the change in values compared to the previous 10 minutes.

We can also create a chart showing the number of messages per minute for the last 10 minutes:

Time of day Mastodon usage

Based on this chart, we can see that we’re creating anywhere from 200–900 messages per second. Part of the reason lies in the fact that the Mastodon servers sometimes disconnect our listener, and at the moment, I have to manually reconnect.

Finally, we can look at the toot length by language:

Chart of toot length by language usage

We see much bigger ranges here than Simon saw in his analysis. He saw a maximum length of 200 characters, whereas we see some messages of up to 4,200 characters. 

Summary#

We hope you enjoyed following along as we explored this fun use case for real-time analytics. As you can see, even though we’re pulling the data from many of the popular Mastodon servers, it’s still not all that much data!

Give the code a try and let us know how it goes. If you have any questions, feel free to join us on Slack, where we’ll gladly do our best to help you out.