Skip to main content

Apache Pinot™ 0.12 - Consumer Record Lag

· 5 min read
Mark Needham
Mark Needham

Watch the video

The Apache Pinot community recently released version 0.12.0, which has lots of goodies for you to play with. I’ve been exploring and writing about those features in a series of blog posts.

This post will explore a new API endpoint that lets you check how much Pinot is lagging when ingesting from Apache Kafka.

Why do we need this?#

A common question in the Pinot community is how to work out the consumption status of real-time tables. 

This was a tricky one to answer, but Pinot 0.12 sees the addition of a new API that lets us see exactly what’s going on.

Worked Example#

Let’s have a look at how it works with help from a worked example. 

First, we’re going to create a Kafka topic with 5 partitions:

docker exec -it kafka-lag-blog \--bootstrap-server localhost:9092 \--partitions 5 \--topic events \--create 

We’re going to populate this topic with data from a data generator, which is shown below:

import datetime, uuid, random, json, click, time
@click.command()@click.option('--sleep', default=0.0, help='Sleep between each message')def generate(sleep):    while True:        ts ="%Y-%m-%dT%H:%M:%S.%fZ")        id = str(uuid.uuid4())        count = random.randint(0, 1000)        print(json.dumps({"tsString": ts, "uuid": id, "count": count}))        time.sleep(sleep)
if __name__ == '__main__':    generate()

We can see an example of the messages generated by this script by running the following:

python --sleep 0.01 2>/dev/null | head -n3 | jq -c

You should see something like this:


So far, so good. Let’s now ingest this data into Kafka:

python --sleep 0.01 2>/dev/null |jq -cr --arg sep ø '[.uuid, tostring] | join($sep)' |kcat -P -b localhost:9092 -t events -K 

Next we’re going to create a Pinot schema and table. First, the schema config:

{    "schemaName": "events",    "dimensionFieldSpecs": [{"name": "uuid", "dataType": "STRING"}],    "metricFieldSpecs": [{"name": "count", "dataType": "INT"}],    "dateTimeFieldSpecs": [      {        "name": "ts",        "dataType": "TIMESTAMP",        "format": "1:MILLISECONDS:EPOCH",        "granularity": "1:MILLISECONDS"      }    ]  }

And now, the table config:

{    "tableName": "events",    "tableType": "REALTIME",    "segmentsConfig": {      "timeColumnName": "ts",      "schemaName": "events",      "replication": "1",      "replicasPerPartition": "1"    },    "tableIndexConfig": {      "loadMode": "MMAP",      "streamConfigs": {        "streamType": "kafka",        "": "events",        "": "kafka-lag-blog:9093",        "stream.kafka.consumer.type": "lowlevel",        "": "smallest",        "": "",        "": "",        "realtime.segment.flush.threshold.rows": "10000000"      }    },    "ingestionConfig": {      "transformConfigs": [        {          "columnName": "ts",          "transformFunction": "FromDateTime(tsString, 'YYYY-MM-dd''T''HH:mm:ss.SSSSSS''Z''')"        }      ]    },    "tenants": {},    "metadata": {}  }

We can create both the table and schema using the AddTable command:

docker run \  --network lag_blog \  -v $PWD/config:/config \  apachepinot/pinot:0.12.0-arm64 AddTable \  -schemaFile /config/schema.json \  -tableConfigFile /config/table.json \  -controllerHost "pinot-controller-lag-blog" \  -exec

Now let’s call the /consumingSegmentsInfo endpoint to see what’s going on:

curl "http://localhost:9000/tables/events/consumingSegmentsInfo" 2>/dev/null | jq

The output of calling this end point is shown below:

{  "_segmentToConsumingInfoMap": {    "events__0__0__20230317T1133Z": [      {        "serverName": "Server_172.29.0.4_8098",        "consumerState": "CONSUMING",        "lastConsumedTimestamp": 1679052823350,        "partitionToOffsetMap": {          "0": "969"        },        "partitionOffsetInfo": {          "currentOffsetsMap": {            "0": "969"          },          "latestUpstreamOffsetMap": {            "0": "969"          },          "recordsLagMap": {            "0": "0"          },          "availabilityLagMsMap": {            "0": "26"          }        }      }    ],}

If we look under partitionOffsetInfo, we can see what’s going on:

  • currentOffsetsMap is Pinot’s current offset
  • latestUpstreamOffsetMap is Kafka’s offset
  • recordsLagMap is the record lag
  • availabilityLagMsMap is the time lag

This output is a bit unwieldy, so let’s create a bash function to tidy up the output into something that’s easier to consume:

function consuming_info() {  curl "http://localhost:9000/tables/events/consumingSegmentsInfo" 2>/dev/null |   jq -rc '[._segmentToConsumingInfoMap | keys[] as $k | (.[$k] | .[] | {    segment: $k,    kafka: (.partitionOffsetInfo.currentOffsetsMap | keys[] as $k | (.[$k])),    pinot: (.partitionOffsetInfo.latestUpstreamOffsetMap | keys[] as $k | (.[$k])),    recordLag: (.partitionOffsetInfo.recordsLagMap | keys[] as $k | (.[$k])),    timeLagMs: (.partitionOffsetInfo.availabilityLagMsMap | keys[] as $k | (.[$k]))})] | (.[0] |keys_unsorted | @tsv), (.[]  |map(.) |@tsv)'  | column -t  printf "\n"

Let’s call the function:


We’ll see the following output:

Consumer record lag output

Now let’s put it in a script and call the watch command so that it will be refreshed every couple of seconds:

function consuming_info() {  curl "http://localhost:9000/tables/events/consumingSegmentsInfo" 2>/dev/null |  jq -rc '[._segmentToConsumingInfoMap | keys[] as $k | (.[$k] | .[] | {    segment: $k,    kafka: (.partitionOffsetInfo.currentOffsetsMap | keys[] as $k | (.[$k])),    pinot: (.partitionOffsetInfo.latestUpstreamOffsetMap | keys[] as $k | (.[$k])),    recordLag: (.partitionOffsetInfo.recordsLagMap | keys[] as $k | (.[$k])),    timeLagMs: (.partitionOffsetInfo.availabilityLagMsMap | keys[] as $k | (.[$k]))})] | (.[0] |keys_unsorted | @tsv), (.[]  |map(.) |@tsv)'  | column -t  printf "\n"}
export -f consuming_infowatch bash -c consuming_info

Give permissions to run it as a script:

chmod u+x watch\_consuming\

And finally, run it:


This will print out a new table every two seconds. Let’s now make things more interesting by removing the sleep from our ingestion command:

python  2>/dev/null |jq -cr --arg sep ø '[.uuid, tostring] | join($sep)' |kcat -P -b localhost:9092 -t events -Kø

And now if we look at the watch output:

Apache Pinot Consumer Record Lag

We get some transitory lag, but it generally goes away by the next time the command is run. 


I love this feature, and it solves a problem I’ve struggled with when using my datasets. I hope you’ll find it just as useful.

Give it a try, and let us know how you get on. If you have any questions about this feature, feel free to join us on Slack, where we’ll be happy to help you out.