Skip to main content

Apache Pinot™ 0.12 - Configurable Time Boundary

· 4 min read
Mark Needham
Mark Needham

Watch the video

The Apache Pinot community recently released version 0.12.0, which has lots of goodies for you to play with. This is the first in a series of blog posts showing off some of the new features in this release.

This post will explore the ability to configure the time boundary when working with hybrid tables.

What is a hybrid table?#

A hybrid table is the term used to describe a situation where we have an offline and real-time table with the same name. The offline table stores historical data, while the real-time data continuously ingests data from a streaming data platform.

How do you query a hybrid table?#

When you write a query against a hybrid table, the Pinot query engine needs to work out which records to read from the offline table and which to read from the real-time table.

It does this by computing the time boundary, determined by looking at the maximum end time of segments in the offline table and the segment ingestion frequency specified for the offline table.

timeBoundary = <Maximum end time of offline segments> - <Ingestion Frequency>

The ingestion frequency can either be 1 hour or 1 day, so one of these values will be used.

When a query for a hybrid table is received by a Pinot Broker, the broker sends a time boundary annotated version of the query to the offline and real-time tables. Any records from or before the time boundary are read from the offline table; anything greater than the boundary comes from the real-time table.

Apache Pinot computing the time boundary

For example, if we executed the following query:

SELECT count(*)FROM events

The broker would send the following query to the offline table:

SELECT count(*)FROM events_OFFLINEWHERE timeColumn <= $timeBoundary

And the following query to the real-time table:

SELECT count(*)FROM events_REALTIMEWHERE timeColumn > $timeBoundary

The results of the two queries are merged by the broker before being returned to the client.

So, what’s the problem?#

If we have some overlap in the data in our offline and real-time tables, this approach works well, but if we have no overlap, we will end up with unexpected results.

For example, let’s say that the most recent timestamp in the events offline table is 2023-01-09T18:41:17, our ingestion frequency is 1 hour, and the real-time table has data starting from 2023-01-09T18:41:18.

This will result in a boundary time of 2023-01-09T17:41:17, which means that any records with timestamps between 17:41 and 18:41 will be excluded from query results.

And the solution?#

The 0.12 release sees the addition of the tables/{tableName}/timeBoundary API, which lets us set the time boundary to the maximum end time of all offline segments.

curl -X POST \  "http://localhost:9000/tables/{tableName}/timeBoundary" \  -H "accept: application/json"

In this case, that will result in a new boundary time of 2023-01-09T18:41:17, which is exactly what we need.

We’ll then be able to query the events table and have it read the offline table to get all records on or before 2023-01-09T18:41:17 and the real-time table for everything else.

Neat, anything else I should know?#

Something to keep in mind when updating the time boundary is that it’s a one-off operation. It won’t be automatically updated if you add a new, more recent segment to the offline table.

In this scenario, you need to call the tables/{tableName}/timeBoundary API again.

And if you want to revert to the previous behavior where the time boundary is computed by subtracting the ingestion frequency from the latest end time, you can do that too:

curl -X DELETE \  "http://localhost:9000/tables/{tableName}/timeBoundary" \  -H "accept: application/json"

Summary#

I love this feature, and it solves a problem I’ve struggled with when using my datasets. I hope you’ll find it just as useful.

Give it a try, and let us know how you get on. If you have any questions about this feature, feel free to join us on Slack, where we’ll be happy to help you out.

Apache Pinot™ 0.11 - Deduplication on Real-Time Tables

· 8 min read
Mark Needham
Mark Needham

Last fall, the Apache Pinot community released version 0.11.0, which has lots of goodies for you to play with.

In this post, we’re going to learn about the deduplication for the real-time tables feature

Why do we need deduplication on real-time tables?#

This feature was built to deal with duplicate data in the streaming platform. 

Users have previously used the upsert feature to de-duplicate data, but this has the following limitations:

  • It forces us to keep redundant records that we don’t want to keep, which increases overall storage costs.
  • We can’t yet use the StarTree index with upserts, so the speed benefits we get from using that indexing technique are lost.

How does dedup differ from upserts?#

Both upserts and dedup keep track of multiple documents that have the same primary key. They differ as follows:

  • Upserts are used when we want to get the latest copy of a document for a given primary key. It’s likely that some or all of the other fields will be different. Pinot stores all documents it receives, but when querying it will only return the latest document for each primary key.
  • Dedup is used when we know that multiple documents with the same primary key are identical. Only the first event received for a given primary key is stored in Pinot—any future events with the same primary key are thrown away.

Let’s see how to use this functionality with help from a worked example.

Setting up Apache Kafka and Apache Pinot#

We’re going to spin up Kafka and Pinot using the following Docker Compose config:

version: "3"services:  zookeeper:    image: zookeeper:3.8.0    hostname: zookeeper    container_name: zookeeper-dedup-blog    ports:      - "2181:2181"    environment:      ZOOKEEPER_CLIENT_PORT: 2181      ZOOKEEPER_TICK_TIME: 2000    networks:       - dedup_blog  kafka:    image: wurstmeister/kafka:latest    restart: unless-stopped    container_name: "kafka-dedup-blog"    ports:      - "9092:9092"    expose:      - "9093"    depends_on:     - zookeeper    environment:      KAFKA_ZOOKEEPER_CONNECT: zookeeper-dedup-blog:2181/kafka      KAFKA_BROKER_ID: 0      KAFKA_ADVERTISED_HOST_NAME: kafka-dedup-blog      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-dedup-blog:9093,OUTSIDE://localhost:9092      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT    networks:       - dedup_blog  pinot-controller:    image: apachepinot/pinot:0.11.0-arm64    command: "QuickStart -type EMPTY"    container_name: "pinot-controller-dedup-blog"    volumes:      - ./config:/config    restart: unless-stopped    ports:      - "9000:9000"    networks:       - dedup_blognetworks:  dedup_blog:    name: dedup_blog

We can spin up our infrastructure using the following command:

docker-compose up

Data Generation#

Let’s imagine that we want to ingest events generated by the following Python script:

import datetimeimport uuidimport randomimport json
while True:    ts = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")    id = str(uuid.uuid4())    count = random.randint(0, 1000)    print(        json.dumps({"tsString": ts, "uuid": id[:3], "count": count})    )

We can view the data generated by this script by pasting the above code into a file called datagen.py and then running the following command:

python datagen.py 2>/dev/null | head -n3 | jq

We’ll see the following output:

{  "tsString": "2023-01-03T10:59:17.355081Z",  "uuid": "f94",  "count": 541}{  "tsString": "2023-01-03T10:59:17.355125Z",  "uuid": "057",  "count": 96}{  "tsString": "2023-01-03T10:59:17.355141Z",  "uuid": "d7b",  "count": 288}

If we generate only 25,000 events, we’ll get some duplicates, which we can see by running the following command:

python datagen.py 2>/dev/null  | jq -r '.uuid' | head -n25000 | uniq -cd

The results of running that command are shown below:

2 3a22 a042 4332 2912 d73

We’re going to pipe this data into a Kafka stream called events, like this:

python datagen.py 2>/dev/null | jq -cr --arg sep 😊 '[.uuid, tostring] | join($sep)' | kcat -P -b localhost:9092 -t events -K😊

The construction of the key/value structure comes from Robin Moffatt’s excellent blog post. Since that blog post was written, kcat has started supporting multi byte separators, which is why we can use a smiley face to separate our key and value.

Pinot Schema/Table Config#

Next, we’re going to create a Pinot table and schema with the same name. Let’s first define a schema:

{  "schemaName": "events",  "dimensionFieldSpecs": [{"name": "uuid", "dataType": "STRING"}],  "metricFieldSpecs": [{"name": "count", "dataType": "INT"}],  "dateTimeFieldSpecs": [    {      "name": "ts",      "dataType": "TIMESTAMP",      "format": "1:MILLISECONDS:EPOCH",      "granularity": "1:MILLISECONDS"    }  ]}

Note that the timestamp field is called ts and not tsString, as it is in the Kafka stream. We’re going to transform the DateTime string value held in that field into a proper timestamp using a transformation function. 

Our table config is described below:

{  "tableName": "events",  "tableType": "REALTIME",  "segmentsConfig": {    "timeColumnName": "ts",    "schemaName": "events",    "replication": "1",    "replicasPerPartition": "1"  },  "tableIndexConfig": {    "loadMode": "MMAP",    "streamConfigs": {      "streamType": "kafka",      "stream.kafka.topic.name": "events",      "stream.kafka.broker.list": "kafka-dedup-blog:9093",      "stream.kafka.consumer.type": "lowlevel",      "stream.kafka.consumer.prop.auto.offset.reset": "smallest",      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"    }  },  "ingestionConfig": {    "transformConfigs": [      {        "columnName": "ts",        "transformFunction": "FromDateTime(tsString, 'YYYY-MM-dd''T''HH:mm:ss.SSSSSS''Z''')"      }    ]  },  "tenants": {},  "metadata": {}}

Let’s create the table using the following command:

docker run \   --network dedup_blog \   -v $PWD/pinot/config:/config \   apachepinot/pinot:0.11.0-arm64 AddTable \     -schemaFile /config/schema.json \     -tableConfigFile /config/table.json \     -controllerHost "pinot-controller-dedup-blog" \    -exec 

Now we can navigate to http://localhost:9000 and run a query that will return a count of the number of each uuid:

select uuid, count(*)from events group by uuidorder by count(*)limit 10

The results of this query are shown below:

Sample Apache Pinot real-time query response stats including duplicates

We can see loads of duplicates! 

Now let’s add a table and schema that uses the de-duplicate feature, starting with the schema:

{  "schemaName": "events_dedup",  "primaryKeyColumns": ["uuid"],  "dimensionFieldSpecs": [{"name": "uuid", "dataType": "STRING"}],  "metricFieldSpecs": [{"name": "count", "dataType": "INT"}],  "dateTimeFieldSpecs": [    {      "name": "ts",      "dataType": "TIMESTAMP",      "format": "1:MILLISECONDS:EPOCH",      "granularity": "1:MILLISECONDS"    }  ]}

The main difference between this schema and the events schema is that we need to specify a primary key. This key can be any number of fields, but in this case, we’re only using the uuid field.

Next, the table config:

{  "tableName": "events_dedup",  "tableType": "REALTIME",  "segmentsConfig": {    "timeColumnName": "ts",    "schemaName": "events_dedup",    "replication": "1",    "replicasPerPartition": "1"  },  "tableIndexConfig": {    "loadMode": "MMAP",    "streamConfigs": {      "streamType": "kafka",      "stream.kafka.topic.name": "events",      "stream.kafka.broker.list": "kafka-dedup-blog:9093",      "stream.kafka.consumer.type": "lowlevel",      "stream.kafka.consumer.prop.auto.offset.reset": "smallest",      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder"    }  },  "routing": {"instanceSelectorType": "strictReplicaGroup"},  "dedupConfig": {"dedupEnabled": true, "hashFunction": "NONE"},  "ingestionConfig": {    "transformConfigs": [      {        "columnName": "ts",        "transformFunction": "FromDateTime(tsString, 'YYYY-MM-dd''T''HH:mm:ss.SSSSSS''Z''')"      }    ]  },  "tenants": {},  "metadata": {}}

The changes to notice here are:

  • "dedupConfig": {"dedupEnabled": true, "hashFunction": "NONE"} - This enables the feature and indicates that we won’t use a hash function on our primary key.
  • "routing": {"instanceSelectorType": "strictReplicaGroup"} - This makes sure that all segments of the same partition are served from the same server to ensure data consistency across the segments. 
docker run \   --network dedup_blog \   -v $PWD/pinot/config:/config \   apachepinot/pinot:0.11.0-arm64 AddTable \     -schemaFile /config/schema-dedup.json \     -tableConfigFile /config/table-dedup.json \     -controllerHost "pinot-controller-dedup-blog" \    -exec
select uuid, count(*)from events_dedupgroup by uuidorder by count(*)limit 10

Sample Apache Pinot real-time query response stats deduplicated

We have every combination of hex values (16^3=4096) and no duplicates! Pinot’s de-duplication feature has done its job.

How does it work? #

When we’re not using the deduplication feature, events are ingested from our streaming platform into Pinot, as shown in the diagram below:

Events ingested from a streaming platform into Apache Pinot without using the deduplication feature

When de-dup is enabled, we have to check whether records can be ingested, as shown in the diagram below:

Events ingested from a streaming platform into Apache Pinot using the deduplication feature

De-dup works out whether a primary key has already been ingested by using an in memory map of (primary key -> corresponding segment reference).

We need to take that into account when using this feature, otherwise, we’ll end up using all the available memory on the Pinot Server. Below are some tips for using this feature:

  • Try to use a simple primary key type and avoid composite keys. If you don’t have a simple primary key, consider using one of the available hash functions to reduce the space taken up.
  • Create more partitions in the streaming platform than you might otherwise create. The number of partitions determines the partition numbers of the Pinot table. The more partitions you have in the streaming platform, the more Pinot servers you can distribute the Pinot table to, and the more horizontally scalable the table will be.

Summary#

This feature makes it easier to ensure that we don’t end up with duplicate data in our Apache Pinot estate. 

So give it a try and let us know how you get on. If you have any questions about this feature, feel free to join us on Slack, where we’ll be happy to help you out.

And if you’re interested in how this feature was implemented, you can look at the pull request on GitHub.

Apache Pinot™ 0.11 - Pausing Real-Time Ingestion

· 7 min read
Mark Needham
Mark Needham

Watch the video

The Apache Pinot community recently released version 0.11.0, which has lots of goodies for you to play with.

In this post, we will learn about a feature that lets you pause and resume real-time data ingestion. Sajjad Moradi has also written a blog post about this feature, so you can treat this post as a complement to that one.

How does real-time ingestion work?#

Before we get into this feature, let’s first recap how real-time ingestion works.

This only applies to tables that have the REALTIME type. These tables ingest data that comes in from a streaming platform (e.g., Kafka). 

Pinot servers ingest rows into consuming segments that reside in volatile memory. 

Once a segment reaches the segment threshold, it will be persisted to disk as a completed segment, and a new consuming segment will be created. This new segment takes over the ingestion of new events from the streaming platform.

The diagram below shows what things might look like when we’re ingesting data from a Kafka topic that has 3 partitions:

Apache pinot 0.11 Real Time Data Ingestion

A table has one consuming segment per partition but would have many completed segments.

Why do we need to pause and resume ingestion?#

There are many reasons why you might want to pause and resume ingestion of a stream. Some of the common ones are described below:

  • There’s a problem with the underlying stream, and we need to restart the server, reset offsets, or recreate a topic
  • We want to ingest data from different streams into the same table.
  • We made a mistake in our ingestion config in Pinot, and it’s now throwing exceptions and isn’t able to ingest any more data.

The 0.11 release adds the following REST API endpoints:

  • /tables/{tableName}/pauseCompletion
  • /tables/{tableName}/resumeCompletion

As the names suggest, these endpoints can be used to pause and resume streaming ingestion for a specific table. This release also adds the /tables/{tableName}/pauseStatus endpoint, which returns the pause status for a table.

Let’s see how to use this functionality with help from a worked example.

Data Generation#

Let’s imagine that we want to ingest events generated by the following Python script:

import datetimeimport uuidimport randomimport json
while True:    ts = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")    id = str(uuid.uuid4())    count = random.randint(0, 1000)    print(        json.dumps({"tsString": ts, "uuid": id, "count": count})    )

We can view the data generated by this script by pasting the above code into a file called datagen.py and then running the following command:

python datagen.py 2>/dev/null | head -n3 | jq

We’ll see the following output:

{  "tsString": "2022-11-23T12:08:44.127481Z",  "uuid": "e1c58795-a009-4e21-ae76-cdd66e090797",  "count": 203}{  "tsString": "2022-11-23T12:08:44.127531Z",  "uuid": "4eedce04-d995-4e99-82ab-6f836b35c580",  "count": 216}{  "tsString": "2022-11-23T12:08:44.127550Z",  "uuid": "6d72411b-55f5-4f9f-84e4-7c8c5c4581ff",  "count": 721}

We’re going to pipe this data into a Kafka stream called ‘events’ like this:

python datagen.py | kcat -P -b localhost:9092 -t events

We’re not setting a key for these messages in Kafka for simplicity’s sake, but Robin Moffat has an excellent blog post that explains how to do it.

Pinot Schema/Table Config#

We want to ingest this data into a Pinot table with the same name. Let’s first define a schema:

Schema:

{  "schemaName": "events",  "dimensionFieldSpecs": [{"name": "uuid", "dataType": "STRING"}],  "metricFieldSpecs": [{"name": "count", "dataType": "INT"}],  "dateTimeFieldSpecs": [    {      "name": "ts",      "dataType": "TIMESTAMP",      "format": "1:MILLISECONDS:EPOCH",      "granularity": "1:MILLISECONDS"    }  ]}

Note that the timestamp field is called ts and not tsString, as it is in the Kafka stream. We will transform the DateTime string value held in that field into a proper timestamp using a transformation function. 

Our table config is described below:

{    "tableName":"events",    "tableType":"REALTIME",    "segmentsConfig":{      "timeColumnName":"ts",      "schemaName":"events",      "replication":"1",      "replicasPerPartition":"1"    },    "tableIndexConfig":{      "loadMode":"MMAP",      "streamConfigs":{        "streamType":"kafka",        "stream.kafka.topic.name":"events",        "stream.kafka.broker.list":"kafka-pause-resume:9093",        "stream.kafka.consumer.type":"lowlevel",        "stream.kafka.consumer.prop.auto.offset.reset":"smallest",        "stream.kafka.consumer.factory.class.name":"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",        "stream.kafka.decoder.class.name":"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",      }    },    "ingestionConfig":{      "transformConfigs": [        {            "columnName": "ts",            "transformFunction": "FromDateTime(tsString, 'YYYY-MM-dd''T''HH:mm:ss.SS''Z''')"        }      ]    },    "tenants":{},    "metadata":{}  }

Our transformation has a subtle error. The second parameter passed to the FromDateTime function describes the format of the DateTime string, which we defined as:

YYYY-MM-dd''T''HH:mm:ss.SS''Z''

But tsString has values in the following format:

2022-11-23T12:08:44.127550Z

i.e., we don’t have enough S values - there should be 5 rather than 2. 

If we create the table using the following command:

docker run \   --network  pause-resume \   -v $PWD/pinot/config:/config \   apachepinot/pinot:0.11.0-arm64 AddTable \     -schemaFile /config/schema.json \     -tableConfigFile /config/table.json \     -controllerHost pinot-controller-pause-resume \    -exec 

Pinot will immediately start trying to ingest data from Kafka, and it will throw a lot of exceptions that look like this:

java.lang.RuntimeException: Caught exception while executing function: fromDateTime(tsString,'YYYY-MM-dd'T'HH:mm:ss.SS'Z'')Caused by: java.lang.IllegalStateException: Caught exception while invoking method: public static long org.apache.pinot.common.function.scalar.DateTimeFunctions.fromDateTime(java.lang.String,java.lang.String) with arguments: [2022-11-23T11:12:34.682504Z, YYYY-MM-dd'T'HH:mm:ss.SS'Z']

At this point, we’d usually be stuck and would need to fix the transformation function and then restart the Pinot server. With the pause/resume feature, we can fix this problem without resorting to such drastic measures. 

The Pause/Resume Flow#

Instead, we can follow these steps:

  • Pause ingestion for the table
  • Fix the transformation function
  • Resume ingestion
  • Profit $$$

We can pause ingestion by running the following command:

curl -X POST \  "http://localhost:9000/tables/events/pauseConsumption" \  -H "accept: application/json"

The response should be something like this:

{  "pauseFlag": true,  "consumingSegments": [    "events__0__0__20221123T1106Z"  ],  "description": "Pause flag is set. Consuming segments are being committed. Use /pauseStatus endpoint in a few moments to check if all consuming segments have been committed."}

Let’s follow the response’s advice and check the consuming segments status:

curl -X GET \  "http://localhost:9000/tables/events/pauseStatus" \  -H "accept: application/json"

We’ll see the following response:

{  "pauseFlag": true,  "consumingSegments": []}

So far, so good. Now we need to fix the table. We have a config, table-fixed.json, that contains a working transformation config. These are the lines of interest:

{    "ingestionConfig":{      "transformConfigs": [        {            "columnName": "ts",            "transformFunction": "FromDateTime(tsString, 'YYYY-MM-dd''T''HH:mm:ss.SSSSSS''Z''')"        }      ]    }}

We now have five S values rather than two, which should sort out our ingestion.

Update the table config:

curl -X PUT "http://localhost:9000/tables/events" \ -H "accept: application/json" \ -H "Content-Type: application/json" \ -d @pinot/config/table-fixed.json

And then resume ingestion. You can pass in the query string parameter consumeFrom, which takes a value of smallest or largest. We’ll pass in smallest since no data has been consumed yet:

curl -X POST \  "http://localhost:9000/tables/events/resumeConsumption?consumeFrom=smallest" \  -H "accept: application/json"

The response will be like this:

{  "pauseFlag": false,  "consumingSegments": [],  "description": "Pause flag is cleared. Consuming segments are being created. Use /pauseStatus endpoint in a few moments to double check."}

Again, let’s check the consuming segments status:

curl -X GET \  "http://localhost:9000/tables/events/pauseStatus" \  -H "accept: application/json"

This time we will see some consuming segments:

{  "pauseFlag": false,  "consumingSegments": [    "events__0__22__20221123T1124Z"  ]}

Navigate to http://localhost:9000/#/query and click on the events table. You should see the following:

Sample events table containing records

We have records! We can also run our data generator again, and more events will be ingested.

Summary#

This feature makes real-time data ingestion a bit more forgiving when things go wrong, which has got to be a good thing in my book.

When you look at the name of this feature, it can seem a bit esoteric and perhaps not something that you’d want to use, but I think you’ll find it to be extremely useful.

So give it a try and let us know how you get on. If you have any questions about this feature, feel free to join us on Slack, where we’ll be happy to help you out.

Apache Pinot™ 0.11 - Timestamp Indexes

· 8 min read
Mark Needham
Mark Needham

Watch the video

The recent Apache Pinot™ 0.11.0 release has lots of goodies for you to play with. This is the third in a series of blog posts showing off some of the new features in this release.

Pinot introduced the TIMESTAMP data type in the 0.8 release, which stores the time in millisecond epoch long format internally. The community feedback has been that the queries they’re running against timestamp columns don’t need this low-level granularity. 

Instead, users write queries that use the datetrunc function to filter at a coarser grain of functionality. Unfortunately, this approach results in scanning data and time value conversion work that takes a long time at large data volumes.

The timestamp index solves that problem! In this blog post, we’ll use it to get an almost 5x query speed improvement on a relatively small dataset of only 7m rows.

Time in milliseconds with and without timestamp indexes bar chart

Spinning up Pinot#

We’re going to be using the Pinot Docker container, but first, we’re going to create a network, as we’ll need that later on:

docker network create timestamp_blog

We’re going to spin up the empty QuickStart in a container named pinot-timestamp-blog:

docker run \  -p 8000:8000 \  -p 9000:9000 \ --name pinot-timestamp-blog \  --network timestamp_blog \  apachepinot/pinot:0.11.0 \  QuickStart -type EMPTY

Or if you’re on a Mac M1, change the name of the image to have the arm-64 suffix, like this:

docker run \  -p 8000:8000 \  -p 9000:9000 \    --network timestamp_blog \ --name pinot-timestamp-blog \  apachepinot/pinot:0.11.0-arm64 \  QuickStart -type EMPTY

Once that’s up and running, we’ll be able to access the Pinot Data Explorer at http://localhost:9000, but at the moment, we don’t have any data to play with.

Importing Chicago Crime Dataset#

The Chicago Crime dataset is a small to medium-sized dataset with 7 million records representing reported crimes in the City of Chicago from 2001 until today.

It contains details of the type of crime, where it was committed, whether an arrest was recorded, which beat it occurred on, and more.

Each of the crimes has an associated timestamp, which makes it a good dataset to demonstrate timestamp indexes.

You can find the code used in this blog post in the Analyzing Chicago Crimes recipe section of Pinot Recipes GitHub repository. From here on, I’m assuming that you’ve downloaded this repository and are in the recipes/analyzing-chicago-crimes directory.

We’re going to create a schema and table named crimes by running the following command:  

docker run \   --network timestamp_blog \   -v $PWD/config:/config \   apachepinot/pinot:0.11.0-arm64 AddTable \     -schemaFile /config/schema.json \     -tableConfigFile /config/table.json \     -controllerHost pinot-timestamp-blog \    -exec  

We should see the following output: 

2022/11/03 13:07:57.169 INFO [AddTableCommand] [main] {"unrecognizedProperties":{},"status":"TableConfigs crimes successfully added"}

A screenshot of the schema is shown below:

Chicago crime dataset table schema

We won’t go through the table config and schema files in this blog post because we did that in the last post, but you can find them in the config directory on GitHub. 

Now, let’s import the dataset. 

docker run \   --network timestamp_blog \   -v $PWD/config:/config \   -v $PWD/data:/data \   apachepinot/pinot:0.11.0-arm64 LaunchDataIngestionJob \    -jobSpecFile /config/job-spec.yml \     -values controllerHost=pinot-timestamp-blog 

It will take a few minutes to load, but once that command has finished, we’re ready to query the crimes table.

Querying crimes by date#

The following query finds the number of crimes that happened after 16th January 2017, grouped by week of the year, with the most crime-filled weeks shown first:

select datetrunc('WEEK', DateEpoch) as tsWeek, count(*) from crimes WHERE tsWeek > fromDateTime('2017-01-16', 'yyyy-MM-dd') group by tsWeekorder by count(*) DESClimit 10

If we run that query, we’ll see the following results:

Chicago crime dataset query result

And, if we look above the query result, there’s metadata about the query, including the time that it took to run.

Chicago crime dataset metadata about the query, including the time that it took to run

The query took 141 ms to execute, so that’s our baseline.

Adding the timestamp index#

We could add a timestamp index directly to this table and then compare query performance, but to make it easier to do comparisons, we’re going to create an identical table with the timestamp index applied. 

The full table config is available in the config/table-index.json file, and the main change is that we’ve added the following section to add a timestamp index on the DateEpoch column:

"fieldConfigList": [  {    "name": "DateEpoch",    "encodingType": "DICTIONARY",    "indexTypes": ["TIMESTAMP"],    "timestampConfig": {      "granularities": [        "DAY",        "WEEK",        "MONTH"      ]    }  }],

encodingType will always be ‘DICTIONARY’ and indexTypes must contain ‘TIMESTAMP’. We should specify granularities based on our query patterns.

As a rule of thumb, work out which values you most commonly pass as the first argument to the datetrunc function in your queries and include those values.

The full list of valid granularities is: millisecond, second, minute, hour, day, week, month, quarter, and year.

Our new table is called crimes_indexed, and we’re also going to create a new schema with all the same columns called crimes_indexed, as Pinot requires the table and schema names to match.

We can create the schema and table by running the following command:

docker run \   --network timestamp_blog \   -v $PWD/config:/config \   apachepinot/pinot:0.11.0-arm64 AddTable \     -schemaFile /config/schema-index.json \     -tableConfigFile /config/table-index.json \     -controllerHost pinot-timestamp-blog \    -exec  

We’ll populate that table by copying the segment that we created earlier for the crimes table. 

docker run \   --network timestamp_blog \   -v $PWD/config:/config \   -v $PWD/data:/data \   apachepinot/pinot:0.11.0-arm64 LaunchDataIngestionJob \    -jobSpecFile /config/job-spec-download.yml \     -values controllerHost=pinot-timestamp-blog 

If you’re curious how that job spec works, I wrote a blog post explaining it in a bit more detail.

Once the Pinot Server has downloaded this segment, it will apply the timestamp index to the DateEpoch column. 

For the curious, we can see this happening in the log files by connecting to the Pinot container and running the following grep command:

​​docker exec -iti pinot-timestamp-blog \  grep -rni -A10 "Successfully downloaded segment:.*crimes_indexed_OFFLINE.*" \     logs/pinot-all.log

We’ll see something like the following (tidied up for brevity):

[BaseTableDataManager]  Successfully downloaded segment: crimes_OFFLINE_0 of table: crimes_indexed_OFFLINE to index dir: /tmp/1667490598253/quickstart/PinotServerDataDir0/crimes_indexed_OFFLINE/crimes_OFFLINE_0[V3DefaultColumnHandler]  Starting default column action: ADD_DATE_TIME on column: $DateEpoch$DAY[SegmentDictionaryCreator]  Created dictionary for LONG column: $DateEpoch$DAY with cardinality: 7969, range: 978307200000 to 1666742400000[V3DefaultColumnHandler]  Starting default column action: ADD_DATE_TIME on column: $DateEpoch$WEEK[SegmentDictionaryCreator]  Created dictionary for LONG column: $DateEpoch$WEEK with cardinality: 1139, range: 978307200000 to 1666569600000[V3DefaultColumnHandler]  Starting default column action: ADD_DATE_TIME on column: $DateEpoch$MONTH[SegmentDictionaryCreator]  Created dictionary for LONG column: $DateEpoch$MONTH with cardinality: 262, range: 978307200000 to 1664582400000[RangeIndexHandler]  Creating new range index for segment: crimes_OFFLINE_0, column: $DateEpoch$DAY[RangeIndexHandler]  Created range index for segment: crimes_OFFLINE_0, column: $DateEpoch$DAY[RangeIndexHandler]  Creating new range index for segment: crimes_OFFLINE_0, column: $DateEpoch$WEEK[RangeIndexHandler]  Created range index for segment: crimes_OFFLINE_0, column: $DateEpoch$WEEK

What does a timestamp index do?#

So, the timestamp index has now been created, but what does it actually do?

When we add a timestamp index on a column, Pinot creates a derived column for each granularity and adds a range index for each new column.

In our case, that means we’ll have these extra columns: $DateEpoch$DAY, $DateEpoch$WEEK, and $DateEpoch$MONTH. 

We can check if the extra columns and indexes have been added by navigating to the segment page and typing $Date$Epoch in the search box.  You should see the following:

Apache Pinot timestamp index on a column

These columns will be assigned the following values:

  • $DateEpoch$DAY = dateTrunc(‘DAY’, DateEpoch)
  • $DateEpoch$WEEK = dateTrunc(‘WEEK’, DateEpoch)
  • $DateEpoch$MONTH = dateTrunc(‘MONTH’, DateEpoch)

Pinot will also rewrite any queries that use the dateTrunc function with DAY, WEEK, or MONTH and the DateEpoch field to use those new columns.

This means that this query:

select datetrunc('WEEK', DateEpoch) as tsWeek, count(*) from crimes_indexedGROUP BY tsWeeklimit 10

Would be rewritten as:

select  $DateEpoch$WEEK as tsWeek, count(*) from crimes_indexedGROUP BY tsWeeklimit 10

And our query:

select datetrunc('WEEK', DateEpoch) as tsWeek, count(*) from crimes WHERE tsWeek > fromDateTime('2017-01-16', 'yyyy-MM-dd') group by tsWeekorder by count(*) DESClimit 10

Would be rewritten as:

select $DateEpoch$WEEK as tsWeek, count(*) from crimes WHERE tsWeek > fromDateTime('2017-01-16', 'yyyy-MM-dd') group by tsWeekorder by count(*) DESClimit 10

Re-running the query#

Let’s now run our initial query against the crimes_indexed table. We’ll get exactly the same results as before, but let’s take a look at the query stats:

Chicago crime dataset updated query stats

This time the query takes 36 milliseconds rather than 140 milliseconds. That’s an almost 5x improvement, thanks to the timestamp index.

Summary#

Hopefully, you’ll agree that timestamp indexes are pretty cool, and achieving a 5x query improvement without much work is always welcome!

If you’re using timestamps in your Pinot tables, be sure to try out this index and let us know how it goes on the StarTree Community Slack . We’re always happy to help out with any questions or problems you encounter.

Apache Pinot™ 0.11 - Inserts from SQL

· 4 min read
Mark Needham
Mark Needham

The Apache Pinot community recently released version 0.11.0, which has lots of goodies for you to play with. This is the second in a series of blog posts showing off some of the new features in this release.

In this post, we’re going to explore the INSERT INTO clause, which makes ingesting batch data into Pinot as easy as writing a SQL query.

Batch importing: The Job Specification#

The power of this new clause is only fully appreciated if we look at what we had to do before it existed. 

In the Batch Import JSON from Amazon S3 into Apache Pinot | StarTree Recipes video (and accompanying developer guide), we showed how to ingest data into Pinot from an S3 bucket.

The contents of that bucket are shown in the screenshot below:

Sample data ingested into Apache Pinot from a S3 bucket

Let’s quickly recap the steps that we had to do to import those files into Pinot. We have a table called events, which has the following schema:

Events schema table

We first created a job specification file, which contains a description of our import job. The job file is shown below:

executionFrameworkSpec:  name: 'standalone'  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'  segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'jobType: SegmentCreationAndTarPushinputDirURI: 's3://marks-st-cloud-bucket/events/'includeFileNamePattern: 'glob:**/*.json'outputDirURI: '/data'overwriteOutput: truepinotFSSpecs:  - scheme: s3    className: org.apache.pinot.plugin.filesystem.S3PinotFS    configs:      region: 'eu-west-2'  - scheme: file    className: org.apache.pinot.spi.filesystem.LocalPinotFSrecordReaderSpec:  dataFormat: 'json'  className: 'org.apache.pinot.plugin.inputformat.json.JSONRecordReader'tableSpec:  tableName: 'events'pinotClusterSpecs:  - controllerURI: 'http://${PINOT_CONTROLLER}:9000'

At a high level, this file describes a batch import job that will ingest files from the S3 bucket at s3://marks-st-cloud-bucket/events/ where the files match the glob:**/*.json pattern.

We can import the data by running the following command from the terminal:

docker run \  --network ingest-json-files-s3 \  -v $PWD/config:/config \  -e AWS_ACCESS_KEY_ID=AKIARCOCT6DWLUB7F77Z \  -e AWS_SECRET_ACCESS_KEY=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 \  apachepinot/pinot:0.11.0 LaunchDataIngestionJob \  -jobSpecFile /config/job-spec.yml \  -values PINOT_CONTROLLER=pinot-controller

And don’t worry, those credentials have already been deleted; I find it easier to understand what values go where if we use real values. 

Once we’ve run this command, if we go to the Pinot UI at http://localhost:9000 and click through to the events table from the Query Console menu, we’ll see that the records have been imported, as shown in the screenshot below:

Sample imported records shown in the Apache Pinot Query Console menu

This approach works, and we may still prefer to use it when we need fine-grained control over the ingestion parameters, but it is a bit heavyweight for your everyday data import!

Batch Importing with SQL#

Now let’s do the same thing in SQL.

There are some prerequisites to using the SQL approach, so let’s go through those now, so you don’t end up with a bunch of exceptions when you try this out! 

First of all, you must have a Minion in the Pinot cluster, as this is the component that will do the data import.

You’ll also need to include the following in your table config:

"task": {  "taskTypeConfigsMap": { "SegmentGenerationAndPushTask": {} }}

As long as you’ve done those two things, we’re ready to write our import query! A query that imports JSON files from my S3 bucket is shown below:

INSERT INTO eventsFROM FILE 's3://marks-st-cloud-bucket/events/'OPTION(  taskName=events-task,  includeFileNamePattern=glob:**/*.json,  input.fs.className=org.apache.pinot.plugin.filesystem.S3PinotFS,  input.fs.prop.accessKey=AKIARCOCT6DWLUB7F77Z,  input.fs.prop.secretKey=gfz71RX+Tj4udve43YePCBqMsIeN1PvHXrVFyxJS,  input.fs.prop.region=eu-west-2);

If we run this query, we’ll see the following output:

Sample events_OFFLINE query result

We can check on the state of the ingestion job via the Swagger REST API. If we navigate to http://localhost:9000/help#/Task/getTaskState, paste Task_SegmentGenerationAndPushTask_events-task as our task name, and then click Execute, we’ll see the following:

Checking the state of an ingestion job screen

If we see the state COMPLETED, this means the data has been ingested, which we can check by going back to the Query console and clicking on the events table.

Summary#

I have to say that batch ingestion of data into Apache Pinot has always felt a bit clunky, but with this new clause, it’s super easy, and it’s gonna save us all a bunch of time.

Also, anything that means I’m not writing YAML files has got to be a good thing!

So give it a try and let us know how you get on. If you have any questions about this feature, feel free to join us on Slack, where we’ll be happy to help you out.

Apache Pinot™ 0.11 - How do I see my indexes?

· 4 min read
Mark Needham
Mark Needham

We recently released Pinot 0.11.0 , which has lots of goodies for you to play with. This is the first in a series of blog posts showing off some of the new features in this release.

A common question from the community is: how can you work out which indexes are currently defined on a Pinot table? This information has always been available via the REST API, but sometimes you simply want to see it on the UI and not have to parse your way through a bunch of JSON. Let's see how it works!

Spinning up Pinot#

We’re going to spin up the Batch QuickStart in Docker using the following command:

docker run \  -p 8000:8000 \  -p 9000:9000 \  apachepinot/pinot:0.11.0 \  QuickStart -type BATCH

Or if you’re on a Mac M1, change the name of the image to have the arm-64 suffix, like this:

docker run \  -p 8000:8000 \  -p 9000:9000 \  apachepinot/pinot:0.11.0-arm64 \  QuickStart -type BATCH

Once that’s up and running, navigate to http://localhost:9000/#/ and click on Tables. Under the tables section click on airlineStats_OFFLINE. You should see a page that looks like this:

airlineStats_OFFLINE page

Click on Edit Table. This will show a window with the config for this table.

Window with configuration for airlineStats_OFFLINE table

Indexing Config#

We’re interested in the tableIndexConfig and fieldConfigList sections. These sections are responsible for defining indexes, which are applied to a table on a per segment basis. 

  • tableIndexConfig is responsible for inverted, JSON, range, Geospatial, and StarTree indexes.
  • fieldConfigList is responsible for timestamp and text indexes.

tableIndexConfig is defined below:

"tableIndexConfig": {  "rangeIndexVersion": 2,  "autoGeneratedInvertedIndex": false,  "createInvertedIndexDuringSegmentGeneration": false,  "loadMode": "MMAP",  "enableDefaultStarTree": false,  "enableDynamicStarTreeCreation": false,  "aggregateMetrics": false,  "nullHandlingEnabled": false,  "optimizeDictionaryForMetrics": false,  "noDictionarySizeRatioThreshold": 0},

From reading this config we learn that no indexes have been explicitly defined.

Now for fieldConfigList, which is defined below:

"fieldConfigList": [  {    "name": "ts",    "encodingType": "DICTIONARY",    "indexType": "TIMESTAMP",    "indexTypes": [      "TIMESTAMP"    ],    "timestampConfig": {      "granularities": [        "DAY",        "WEEK",        "MONTH"      ]    }  }],

From reading this config we learn that a timestamp index is being applied to the ts column. It is applied at DAY, WEEK, and MONTH granularities, which means that the derived columns $ts$DAY, $ts$WEEK, and $ts$MONTH will be created for the segments in this table.

Viewing Indexes#

Now, close the table config modal, and under the segments section, open airlineStats_OFFLINE_16071_16071_0 and airlineStats_OFFLINE_16073_16073_0 in new tabs.

If you look at one of those segments, you’ll see the following grid that lists columns/field names against the indexes defined on those fields.

Segment grid that lists columns/field names against the indexes defined on those fields

All the fields on display are persisting their values using the dictionary/forward index format ). Still, we can also see that the Quarter column is sorted and has an inverted index, neither of which we explicitly defined.

This is because Pinot will automatically create sorted and inverted indexes for columns whose data is sorted when the segment is created. 

So the data for the Quarter column was sorted, and hence it has a sorted index.

I’ve written a couple of blog posts explaining how sorted indexes work on offline and real-time tables:

Adding an Index#

Next, let’s see what happens if we add an explicit index. We’re going to add an inverted index to the FlightNum column. Go to Edit Table config again and update tableIndexConfig to have the following value:

Inverted index addition

If you go back to the page for segment airlineStats_OFFLINE_16073_16073_0, notice that it does not have an inverted index for this field.

page for segment airlineStats_OFFLINE_16073_16073_0 without an inverted index

This is because indexes are applied on a per segment basis. If we want the inverted index on the FlightNum column in this segment, we can click Reload Segment on this page, or we can go back to the table page and click Reload All Segments

If we do that, all the segments in the airlineStats_OFFLINE table will eventually have an inverted index on FlightNum.

Summary#

As I mentioned in the introduction, information about the indexes on each segment has always been available via the REST API, but this feature democratizes that information. 

If you have any questions about this feature, feel free to join us on Slack, where we’ll be happy to help you out.

GapFill Function For Time-Series Datasets In Pinot

· 9 min read
Weixiang Sun,Lakshmanan Velusamy
Weixiang Sun,Lakshmanan Velusamy

Many real-world datasets are time-series in nature, tracking the value or state changes of entities over time. The values may be polled and recorded at constant time intervals or at random irregular intervals or only when the value/state changes. There are many real-world use cases of time series data. Here are some specific examples:

  • Telemetry from sensors monitoring the status of industrial equipment.
  • Real-time vehicle data such as speed, braking, and acceleration, to produce the driver's risk score trend.
  • Server performance metrics such as CPU, I/O, memory, and network usage over time.
  • An automated system tracking the status of a store or items in an online marketplace.

Let us use an IOT dataset tracking the occupancy status of the individual parking slots in a parking garage using automated sensors in this post. The granularity of recorded data points might be sparse or the events could be missing due to network and other device issues in the IOT environment. The following figure demonstrates entities emitting values at irregular intervals as the value changes. Polling and recording values of all entities regularly at a lower granularity would consume more resources, take up more space on disk and during processing and incur high costs. But analytics applications that are operating on these datasets, might be querying for values at a lower granularity than the data recording interval (Ex: A dashboard showing the total no of occupied parking slots at 15 min granularity in the past week when the sensors are not recording status as frequent).

Entities emitting data over time at irregular intervals

It is important for Pinot to provide the on-the-fly interpolation (filling the missing data) functionality to better handle time-series data.

Starting from the 0.11.0 release, we introduced the new query syntax, gapfilling functions to interpolate data and perform powerful aggregations and data processing over time series data.

We will discuss the query syntax with an example and then the internal architecture.

Processing time series data in Pinot#

Let us use the following sample data set tracking the status of parking lots in the parking space to understand this feature in detail.

Sample Dataset:#

Sample parking lot dataset

parking_data table

Use case: We want to find out the total number of parking lots that are occupied over a period of time, which would be a common use case for a company that manages parking spaces.

Let us take 30 minutes time bucket as an example:

Sample parking lot dataset with 30 minute time bucket

In the 30 mins aggregation results table above, we can see a lot of missing data as many lots didn't have anything recorded in those 30-minute windows. To calculate the number of occupied parking lots per time bucket, we need to gap-fill the missing data for each of these 30-minute windows.

Interpolating missing data#

There are multiple ways to infer and fill the missing values. In the current version, we introduce the following methods, which are more common:

  • FILL_PREVIOUS_VALUE can be used to fill time buckets missing values for entities with the last observed value. If no previous observed value can be found, the default value is used as an alternative.
  • FILL_DEFAULT_VALUE can be used to fill time buckets missing values for entities with the default value depending on the data type.

More advanced gapfilling strategies such as using the next observed value, the value from the previous day or past week, or the value computed using a subquery shall be introduced in the future.

Gapfill Query with a Use Case:#

Let us write a query to get the total number of occupied parking lots every 30 minutes over time on the parking lot dataset discussed above.

Query Syntax:#

SELECT time_col, SUM(status) AS occupied_slots_countFROM (    SELECT GAPFILL(time_col,'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS','2021-10-01 09:00:00.000',                   '2021-10-01 12:00:00.000','30:MINUTES', FILL(status, 'FILL_PREVIOUS_VALUE'),                    TIMESERIESON(lot_id)), lot_id, status    FROM (        SELECT DATETIMECONVERT(event_time,'1:MILLISECONDS:EPOCH',               '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS','30:MINUTES') AS time_col,               lot_id, lastWithTime(is_occupied, event_time, 'INT') AS status        FROM parking_data        WHERE event_time >= 1633078800000 AND  event_time <= 1633089600000        GROUP BY 1, 2        ORDER BY 1        LIMIT 100)    LIMIT 100)GROUP BY 1LIMIT 100

This query suggests three main steps:

  1. The raw data will be aggregated;
  2. The aggregated data will be gapfilled;
  3. The gapfilled data will be aggregated.

We make one assumption that the raw data is sorted by timestamp. The Gapfill and Post-Gapfill Aggregation will not sort the data.

Query components:#

The following concepts were added to interpolate and handle time-series data.

  • LastWithTime(dataColumn, timeColumn, 'dataType') - To get the last value of dataColumn where the timeColumn is used to define the time of dataColumn. This is useful to pick the latest value when there are multiple values found within a time bucket. Please see https://docs.pinot.apache.org/users/user-guide-query/supported-aggregations for more details.
  • Fill(colum, FILL_TYPE) - To fill the missing data of the column with the FILL_TYPE.
  • TimeSeriesOn - To specify the columns to uniquely identify entities whose data will be interpolated.
  • Gapfill - Specify the time range, the time bucket size, how to fill the missing data, and entity definition.

Query Workflow#

The innermost sql will convert the raw event table to the following table.

Sample parking lot query workflow innermost SQL

The second most nested sql will gap fill the returned data as below:

Sample parking lot query workflow second most SQL

The outermost query will aggregate the gapfilled data as follows:

Sample parking lot query workflow outermost SQL

Other Supported Query Scenarios:#

The above example demonstrates the support to aggregate before and post gapfilling. Pre and/or post aggregations can be skipped if they are not needed. The gapfilling query syntax is flexible to support the following use cases:

  • Select/Gapfill - Gapfill the missing data for the time bucket. Just the raw events are fetched, gapfilled, and returned. No aggregation is needed.
  • Aggregate/Gapfill - If there are multiple entries within the time bucket we can pick a representative value by applying an aggregate function. Then the missing data for the time buckets will be gap filled.
  • Gapfill/Aggregate - Gapfill the data and perform some form of aggregation on the interpolated data.

For detailed query syntax and how it works, please refer to the documentation here: https://docs.pinot.apache.org/users/user-guide-query/gap-fill-functions.

How does it work?#

Let us use the sample query given above as an example to understand what's going on behind the scenes and how Pinot executes the gapfill queries.

Request Flow#

Here is the list of steps in executing the query at a high level:

  1. Pinot Broker receives the gapfill query. It will strip off the gapfill part and send out the stripped SQL query to the pinot server.
  2. The pinot server will process the query as a normal query and return the result back to the pinot broker.
  3. The pinot broker will run the DataTableReducer to merge the results from pinot servers. The result will be sent to GapfillProcessor.
  4. The GapfillProcessor will gapfill the received result and apply the filter against the gap-filled result.
  5. Post-Gapfill aggregation and filtering will be applied to the result from the last step.

There are two gapfill-specific steps:

  1. When Pinot Broker Server receives the gapfill SQL query, it will strip out gapfill related information and send out the stripped SQL query to the pinot server
  2. GapfillProcessor will process the result from BrokerReducerService. The gapfill logic will be applied to the reduced result.

Gapfill steps

Here is the stripped version of the sql query sent to servers for the query shared above:

SELECT DATETIMECONVERT(event_time,'1:MILLISECONDS:EPOCH',               '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS','30:MINUTES') AS time_col,               lot_id, lastWithTime(is_occupied, event_time, 'INT') AS status        FROM parking_data        WHERE event_time >= 1633078800000 AND  event_time <= 1633089600000        GROUP BY 1, 2        ORDER BY 1        LIMIT 100

Execution Plan#

The sample execution plan for this query is as shown in the figure below:

Sample query execution plan

Time and Space complexity:#

Let us say there are M entities, R rows returned from servers, and N time buckets. The data is gapfilled time bucket by time bucket to limit the broker memory usage to O(M + N + R). When the data is gapfilled for a time bucket, it will be aggregated and stored in the final result (which has N slots). The previous values for each of the M entities are maintained in memory and carried forward as the gapfilling is performed in sequence. The time complexity is O(M * N) where M is the number of entities and N is the number of time buckets.

Challenges#

Sample server challenges graph

As the time-series datasets are enormous and partitioned, it's hard to get answers to the following questions:

  • How many different entities exist within the query time frame. In the temporal partition scheme demonstrated above, a server/partition may not know the answer.
  • What's the previously observed value for entities especially for the first data points in a time bucket where previous time buckets don’t exist in the same server.

For the scenario shown in the figure above, server2 may not know about the circle entity, as there are no events for the circle in Server2. It would also not know the last observed value for the square entity frame beginning of the time bucket till the first observed value timestamp within the partition.

The Future Work#

When doing the gapfill for one or a few entities, there might not be too much data. But when we deal with a large dataset that has multiple entities queried over a long date range without any filtering, this gets tricky. Since gapfill happens at the pinot broker, it will become very slow and the broker will become a bottleneck. The raw data transferred from servers to brokers would be enormous. Data explodes when interpolated. Parallelism is limited as the single broker instance is handling the query.

The next step of the gapfill project is to remove the pinot broker as a bottleneck. The gapfill logic will be pushed down to the servers and be running where the data live. This will reduce the data transmission and increase the parallelism and performance of gapfill.

Announcing Apache Pinot 0.10

· 5 min read
Apache Pinot Engineering Team
Apache Pinot Engineering Team

We are excited to announce the release this week of Apache Pinot 0.10. Apache Pinot is a real-time distributed datastore designed to answer OLAP queries with high throughput and low latency.

This release is cut from commit fd9c58a11ed16d27109baefcee138eea30132ad3. You can find a full list of everything included in the release notes.

Let’s have a look at some of the changes, with the help of the batch QuickStart configuration.

Query Plans#

Amrish Lal implemented the EXPLAIN PLAN clause, which returns the execution plan that will be chosen by the Pinot Query Engine. This lets us see what the query is likely to do without actually having to run it.

EXPLAIN PLAN FORSELECT *FROM baseballStatsWHERE league = 'NL'

If we run this query, we'll see the following results:

OperatorOperator_IdParent_Id
BROKER_REDUCE(limit:10)0-1
COMBINE_SELECT10
SELECT(selectList:AtBatting, G_old, baseOnBalls, caughtStealing, doules, groundedIntoDoublePlays, hits, hitsByPitch, homeRuns, intentionalWalks, league, numberOfGames, numberOfGamesAsBatter, playerID, playerName, playerStint, runs, runsBattedIn, sacrificeFlies, sacrificeHits, stolenBases, strikeouts, teamID, tripples, yearID)21
TRANSFORM_PASSTHROUGH(AtBatting, G_old, baseOnBalls, caughtStealing, doules, groundedIntoDoublePlays, hits, hitsByPitch, homeRuns, intentionalWalks, league, numberOfGames, numberOfGamesAsBatter, playerID, playerName, playerStint, runs, runsBattedIn, sacrificeFlies, sacrificeHits, stolenBases, strikeouts, teamID, tripples, yearID)32
PROJECT(homeRuns, playerStint, groundedIntoDoublePlays, numberOfGames, AtBatting, stolenBases, tripples, hitsByPitch, teamID, numberOfGamesAsBatter, strikeouts, sacrificeFlies, caughtStealing, baseOnBalls, playerName, doules, league, yearID, hits, runsBattedIn, G_old, sacrificeHits, intentionalWalks, runs, playerID)43
FILTER_FULL_SCAN(operator:EQ,predicate:league = 'NL')54

FILTER Clauses for Aggregates#

Atri Sharma added the filter clause for aggregates. This feature makes it possible to write queries like this:

SELECT SUM(homeRuns) FILTER(WHERE league = 'NL') AS nlHomeRuns,       SUM(homeRuns) FILTER(WHERE league = 'AL') AS alHomeRunsFROM baseballStats

If we run this query, we'll see the following output:

nlHomeRunsalHomeRuns
135486135990

greatest and least#

Richard Startin added the greatest and least functions:

SELECT playerID,       least(5.0, max(homeRuns)) AS homeRuns,       greatest(5.0, max(hits)) AS hitsFROM baseballStatsWHERE league = 'NL' AND teamID = 'SFN'GROUP BY playerIDLIMIT 5

If we run this query, we'll see the following output:

playerIDhomeRunshits
ramirju0105
milneed01454
testani0105
shawbo0108
vogelry01012

DistinctCountSmartHLL#

Xiaotian (Jackie) Jiang added the DistinctCountSmartHLL aggregation function, which automatically converts the Set to HyperLogLog if the set size grows too big to protect the servers from running out of memory:

SELECT DISTINCTCOUNTSMARTHLL(homeRuns, 'hllLog2m=8;hllConversionThreshold=10')FROM baseballStats

If we run this query, we'll see the following output:

distinctcountsmarthll(homeRuns)
66

UI updates#

There were also a bunch of updates to the Pinot Data Explorer, by Sanket Shah and Johan Adami.

The display of reported size and estimated size is now in a human readable format:

Human readable sizes

Fixes for the following issues:

  • Error messages weren't showing on the UI when an invalid operation is attempted:

A backwards incompatible attempted schema change

  • Query console goes blank on syntax error.
  • Query console cannot show query result when multiple columns have the same name.
  • Adding extra fields after SELECT * would throw a NullPointerException.
  • Some queries were returning -- instead of 0.
  • Query console couldn't show the query result if multiple columns had the same name.
  • Pinot Dashboard tenant view showing the incorrect amount of servers and brokers.

RealTimeToOffline Task#

Xiaotian (Jackie) Jiang made some fixes to the RealTimeToOffline job to handle time gaps and proceed to the next time window when no segment matches the current one.

Empty QuickStart#

Kenny Bastani added an empty QuickStart command, which lets you quickly spin up an empty Pinot cluster:

docker run \  -p 8000:8000 \  -p 9000:9000 \  apachepinot/pinot:0.10.0 QuickStart \  -type empty

You can then ingest your own dataset without needing to worry about spinning up each of the Pinot components individually.

Data Ingestion#

  • Richard Startin fixed some issues with real-time ingestion where consumption of messages would stop if a bad batch of messages was consumed from Kafka.

  • Mohemmad Zaid Khan added the BoundedColumnValue partition function, which partitions segments based on column values.

  • Xiaobing Li added the fixed name segment generator, which can be used when you want to replace a specific existing segment.

Other changes#

  • Richard Startin set LZ4 compression as the default for all metrics fields.
  • Mark Needham added the ST_Within geospatial function.
  • Rong Rong fixed a bug where query stats wouldn't show if there was an error processing the query (e.g. if the query timed out).
  • Prashant Pandey fixed the query engine to handle extra columns added to a SELECT * statement.
  • Richard Startin added support for forward indexes on JSON columns.
  • Rong Rong added the GRPC broker request handler so that data can be streamed back from the server to the broker when processing queries.
  • deemoliu made it possible to add a default strategy when using the partial upsert feature.
  • Jeff Moszuti added support for the TIMESTAMP data type in the configuration recommendation engine.

Dependency updates#

The following dependencies were updated:

  • async-http-client because the library moved to a different organization.
  • RoaringBitmap to 0.9.25
  • JsonPath to 2.7.0
  • Kafka to 2.8.1
  • Prometheus to 0.16.1

Resources#

If you want to try out Apache Pinot, the following resources will help you get started:

Text analytics on LinkedIn Talent Insights using Apache Pinot

· One min read
LinkedIn
LinkedIn Engineering Team

LinkedIn Talent Insights (LTI) is a platform that helps organizations understand the external labor market and their internal workforce, and enables the long term success of their employees. Users of LTI have the flexibility to construct searches using the various facets of the LinkedIn Economic Graph (skills, titles, location, company, etc.).

Read More at https://engineering.linkedin.com/blog/2021/text-analytics-on-linkedin-talent-insights-using-apache-pinot

Text analytics on LinkedIn Talent Insights using Apache Pinot

Introduction to Geospatial Queries in Apache Pinot

· One min read
Kenny Bastani
Kenny Bastani

Geospatial data has been widely used across the industry, spanning multiple verticals, such as ride-sharing and delivery, transportation infrastructure, defense and intel, public health. Deriving insights from timely and accurate geospatial data could enable mission-critical use cases in the organizations and fuel a vibrant marketplace across the industry. In the design document for this new Pinot feature, we discuss the challenges of analyzing geospatial at scale and propose the geospatial support in Pinot.

Read More at https://medium.com/apache-pinot-developer-blog/introduction-to-geospatial-queries-in-apache-pinot-b63e2362e2a9

Introduction to Geospatial Queries in Apache Pinot