Skip to main content

2 posts tagged with "streaming"

View All Tags

How to Ingest Streaming Data from Kafka to Apache Pinot™

· 9 min read
Barkha Herman
Developer Advocate

We previously walked through getting started with Apache Pinot™ using batch data, and now we will learn how to ingest streaming data using Apache Kafka® topics. 

As the story goes, Apache Pinot was created at LinkedIn to provide a platform that could ingest a high number of incoming events (kafka) and provide “fresh” (sub second) analytics to a large number (20+ million) of users, fast (sub second latency). So, really, consuming events is part of the reason why Pinot was created.

The obligatory “What is Apache Pinot and StarTree?” section#

Pinot is a real-time, distributed, open source, and free-to-use OLAP datastore, purpose-built to provide ultra low-latency analytics at extremely high throughput. It is open source and free to use.

How does StarTree come in? StarTree offers a fully managed version of the Apache Pinot real-time analytics system , plus other tools around it that you can try for free. The system includes  StarTree Dataset Manager and StarTree ThirdEye, a UI based data ingestion tool, and a real-time anomaly detection and root cause analysis tool, respectively.

How to install Kafka alongside Pinot #

Prerequisite#

Complete the steps outlined in the introduction to Apache Pinot

Step 1: Install Kafka on your Pinot Docker image#

Make sure you have completed the first article in the series.

We will be installing Apache Kafka onto our already existing Pinot docker image. To start the Docker image, run the following command:

docker run -it --entrypoint /bin/bash -p 9000:9000 apachepinot/pinot:0.12.0

PowerShell 7.3.4 docker run Apache Pinot

We want to override the ENTRYPOINT and run Bash script within the Docker image. If you already have a container running, you can skip this step. I tend to tear down containers after use, so in my case, I created a brand new container.

Now, start each of the components one at a time like we did in the previous session:

bin/pinot-admin.sh StartZookeeper &

bin/pinot-admin.sh StartController &

bin/pinot-admin.sh StartBroker &

bin/pinot-admin.sh StartServer &

Run each of the commands one at a time. The & allows you to continue using the same Bash shell session. If you like, you can create different shells for each service:

  1. Get the container ID by running docker ps
  2. Run docker exec -it DOCKER_CONTAINER_ID bash where DOCKER_CONTAINER_ID is the ID received from step 1.
  3. Run the pinot-admin.sh command to start the desired service

It should look like this:

Docker with container ID, Image, Command, and Created

You can now browse to http://localhost:9000/#/zookeeper to see the running cluster:

Empty Zookeeper Browser

Step 2: Install Kafka on the Docker container#

Next, let's install Kafka. We will be installing Kafka on the existing docker container. For this step, download the TAR file, extract the contents, and start Kafka.

Apache Kafka is an open source software platform that provides a unified, high-throughput, low-latency platform for handling real-time data feeds.

Use the following command to download the Kafka image:

cd ..curl https://downloads.apache.org/kafka/3.4.0/kafka_2.12-3.4.0.tgz --output kafka.tgz --output kafka.tgz

It should look this:

Code with Apache Pinot speed results

Note that we’ve changed the directory to keep the Kafka folder separate from the Pinot folder.

Now, let’s expand the downloaded TAR file, rename the folder for convenience, and delete the downloaded file:

tar -xvf kafka.tgzmv kafka_2.12-3.4.0 kafkarm -rf kafka.tgz

It should look like this:

Code with Apache Kafka

Code with kafka version

Now, Kafka and Pinot reside locally on our Docker container with Pinot up and running. Let’s run the Kafka service. Kafka will use the existing ZooKeeper for configuration management.

Use the following command to run Kafka:

cd kafka./bin/kafka-server-start.sh config/server.properties

It should look like this:

Code with cd kafka

To verify that Kafka is running, let’s look at our ZooKeeper configs by browsing to http://localhost:9000/#/zookeeper:

Zookeeper Browser

You may have to refresh the page and find many more configuration items appear thanexpectedt. These are Kafka configurations. 

Step 3: Ingest data into Kafka#

In this step, we will ingest data into Kafka. We will be using Wikipedia events since they are easily accessible. We will use a node script to ingest the Wikipedia events, then add them to a Kafka Topic.

Let’s first create some folders like this:

cd /opt

mkdir realtime

cd realtime

mkdir events

It should look like this:

Code with realtime

You may have to start a new PowerShell window and connect to Docker for this. Now, let’s install Node.js and any dependencies we might need for the event consumption script:

curl -fsSL https://deb.nodesource.com/setup_14.x | bash -apt install nodejs

Node.js takes a few minutes to install. Next, we will create a script to consume the events called wikievents.js. Cut and paste the following code to this file:

var EventSource = require("eventsource");var fs = require("fs");var path = require("path");const { Kafka } = require("kafkajs");
var url = "https://stream.wikimedia.org/v2/stream/recentchange";
const kafka = new Kafka({ clientId: "wikievents", brokers: ["localhost:9092"],});
const producer = kafka.producer();
async function start() { await producer.connect(); startEvents();}
function startEvents() { console.log(`Connecting to EventStreams at ${url}`); var eventSource = new EventSource(url);
 eventSource.onopen = function () { console.log("--- Opened connection."); };
 eventSource.onerror = function (event) { console.error("--- Encountered error", event); };
 eventSource.onmessage = async function (event) { const data = JSON.parse(event.data); const eventPath = path.join(__dirname, "./events", data.wiki); fs.existsSync(eventPath) || fs.mkdirSync(eventPath); fs.writeFileSync(path.join(eventPath, data.meta.id + ".json"), event.data); await producer.send({ topic: "wikipedia-events", messages: [ { key: data.meta.id, value: event.data, }, ], }); };}
start();

You can use vi to create the file and save it. You can also use Docker Desktop to edit the file.

To install the two modules referenced in the file above, kafkajs and eventsource, run the following command:

npm i eventsource kafkajs

Let’s run the program. This will result in the download of many files, so I recommend running the program for just a few minutes. You can stop the run by using Ctrl-C.

node wikievents.js

Use Ctrl-C to stop the program. Navigate to the events folder to see some new folders created with the various language events downloaded from Wikipedia.

Wikievents node in code

Navigate to the enwiki folder and review some of the downloaded JSON files.

Code with realtime wikievents

At http://localhost:9000/#/zookeeper, you can find the Kafka topic by locating the ZooKeeper config and expanding config > topics. You may have to refresh your browser.

Zookeeper browser in Apache Pinot topics

Here, you should see the wikipedia-events topic that we created using the Node.js script. So far, so good.

Step 4: Connect Kafka to Pinot#

With Kafka installed and configured to receive events, we can connect it to Pinot. 

To create a real-time table in Pinot that can consume the Kafka topic, create a schema and a configuration table. The schema configuration is very much like the schema that we created for our batch example. You can use vi to create a file named realtime.schema.json and cut and paste the content below.

Here’s the JSON for the wikievents schema:

{ "schemaName": "wikievents", "dimensionFieldSpecs": [ { "name": "id", "dataType": "STRING" }, { "name": "wiki", "dataType": "STRING" }, { "name": "user", "dataType": "STRING" }, { "name": "title", "dataType": "STRING" }, { "name": "comment", "dataType": "STRING" }, { "name": "stream", "dataType": "STRING" }, { "name": "domain", "dataType": "STRING" }, { "name": "topic", "dataType": "STRING" }, { "name": "type", "dataType": "STRING" }, { "name": "metaJson", "dataType": "STRING" } ], "dateTimeFieldSpecs": [ { "name": "timestamp", "dataType": "LONG", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" } ]}

Creating the table config file is where the magic happens. Use vi (or your favorite editor) to create realtime.tableconfig.json and cut and paste the following content:

{ "tableName": "wikievents_REALTIME", "tableType": "REALTIME", "segmentsConfig": { "timeColumnName": "timestamp", "schemaName": "wikievents", "replication": "1", "replicasPerPartition": "1" }, "tenants": { "broker": "DefaultTenant", "server": "DefaultTenant", "tagOverrideConfig": {} }, "tableIndexConfig": { "invertedIndexColumns": [], "rangeIndexColumns": [], "autoGeneratedInvertedIndex": false, "createInvertedIndexDuringSegmentGeneration": false, "sortedColumn": [], "bloomFilterColumns": [], "loadMode": "MMAP", "streamConfigs": { "streamType": "kafka", "stream.kafka.topic.name": "wikipedia-events", "stream.kafka.broker.list": "localhost:9092", "stream.kafka.consumer.type": "lowlevel", "stream.kafka.consumer.prop.auto.offset.reset": "smallest", "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", "realtime.segment.flush.threshold.rows": "0", "realtime.segment.flush.threshold.time": "24h", "realtime.segment.flush.segment.size": "100M" }, "noDictionaryColumns": [], "onHeapDictionaryColumns": [], "varLengthDictionaryColumns": [], "enableDefaultStarTree": false, "enableDynamicStarTreeCreation": false, "aggregateMetrics": false, "nullHandlingEnabled": false }, "metadata": {}, "quota": {}, "routing": {}, "query": {}, "ingestionConfig": { "transformConfigs": [ { "columnName": "metaJson", "transformFunction": "JSONFORMAT(meta)" }, { "columnName": "id", "transformFunction": "JSONPATH(metaJson, '$.id')" }, { "columnName": "stream", "transformFunction": "JSONPATH(metaJson, '$.stream')" }, { "columnName": "domain", "transformFunction": "JSONPATH(metaJson, '$.domain')" }, { "columnName": "topic", "transformFunction": "JSONPATH(metaJson, '$.topic')" } ] }, "isDimTable": false}

Notice the section called streamConfigs, where we define the source as a Kafka stream, located at localhost:9092, and consume the topic wikipedia-events. That’s all it takes to consume a Kafka Topic into Pinot.

Don’t believe me? Give it a try!

Create the table by running the following command:

/opt/pinot/bin/pinot-admin.sh AddTable -schemaFile /opt/realtime/realtime.schema.json -tableConfigFile /opt/realtime/realtime.tableconfig.json -exec

Now, browse to the following location http://localhost:9000/#/tables, and you should see the newly created table. However, where’s the real-time data, you say?

Run the node wikievents.js command, then query the newly created wikievents table to see the totalDocs increase in real time:

Apache Pinot query console

To avoid running out of space on your computer, make sure to stop the wikievents.js script when you’re done :-D

Conclusion#

Congratulations! Using only the table config, we simultaneously consumed Kafka topics directly into Pinot tables and queried events. We also transformed JSON to map to the Pinot table. In the transformConfigs portion of the Pinot table config file, we consumed the nested block meta into a field called metaJson. In the subsequent steps, we referenced the metaJson field with jsonPath to extract fields such as id, stream, domain, and topic. 

Not only does Pinot support easy ingestion from Kafka topics, but it also provides a robust way to transform JSON to OLAP tables. 

In summary, we have:

  • Installed and run Kafka
  • Consumed events from Wikipedia into Kafka
  • Created a real-time table schema and a table in Pinot
  • Streamed events from Wikipedia into Pinot tables via Kafka topics
  • Run multiple queries
  • Performed JSON transformations

In some upcoming blog posts, we will explore more advanced topics, such as indexes and transformations, not to mention real-time anomaly detection with ThirdEye.

In the meantime, run more queries, load more data, and don’t forget to join the community Slack for support if you get stuck or would like to request a topic for me to write about—you know where to find us!

Apache Pinot Tutorial for Getting Started - A Step-by-Step Guide

· 8 min read
Barkha Herman
Developer Advocate

How do you get started with Apache Pinot™? Good question! To save you the hassle of trying to tackle this on your own, here’s a handy guide that overviews all of the components that make up Pinot and how to set Pinot up.

The Obligatory What is Apache Pinot and StarTree Section#

Pinot is an open source, free-to-use, real-time, and distributed OLAP datastore, purpose built to provide ultra low-latency analytics at extremely high throughput.

StarTree offers a fully managed version of the Apache Pinot real-time analytics system and other tools around it, such as a real-time anomaly detection and root cause analysis tool, which you can try for free.

What do you need to run Apache Pinot?#

The Docker image that we will use runs multiple services. To accommodate this, we recommend at a minimum the following resources in order to run the sample:

  • CPUs: four or more
  • Memory: 8 GB or more
  • Swap: 2 GB or more
  • Disk space: 10 GB or more

Note: When importing custom data or event streaming, you may need more resources. Additionally, note that if not set, Docker will use resources from the host environment as needed and available.

Step-by-step installation of Apache Pinot#

For this intro tutorial, we will use Docker. Alternatively, you can run Pinot locally if you wish. 

The instructions use a Windows 11 computer, but they will work on Macs as well. Also note that I am using VS Code with the Docker extension installed.

Step 1: #

Make sure you have Docker installed on your machine.

Docker is a set of platform as a service (PaaS) products that use OS-level virtualization to deliver software in packages called containers.

Step 2:#

Now, let’s download the Docker image. On a Windows machine, start a new PowerShell command window. Note that this is not the same as a Windows PowerShell command window, as shown below. 

Download Docker image on Windows with PowerShell command window

Use the following command to get (pull) the image we are looking for:

docker pull apachepinot/pinot:0.12.0

You can also download the latest version like so:

docker pull apachepinot/pinot:latest

Here, apachepinot is the name of the repository in Docker Hub, pinot is the name of the image, and :latest or :0.12.0 is the version for the image.  Note that we will be using the 0.12.0 version for this blog post.

Docker Hub is the world’s largest repository of container images in the world. 

You can verify the image was downloaded or pulled by running the following command:

docker images

It should show you the image like so:

Docker images command

Step 3:#

Let’s run a container using the Docker image that we downloaded:

docker run -it --entrypoint /bin/bash -p 9000:9000 apachepinot/pinot:0.12.0

Running a container with downloaded Docker image

The docker run command runs the image. The -p 9000:00 option maps the docker container port 9000 to the local machine port 9000. This allows us to access the Pinot UI, which defaults to port 9000 to be accessible from the localhost. We are using the –entrypoint to override the default entrypoint and replace it with Bash. We want to override the default behavior so that we can start each component one at a time. The next parameter apachepinot/pinot:0.12.0 is the Docker image we pulled above.

After running the command, we’ll find ourselves in the Docker container instance running Bash shell. We can use ls to list the contents of the Docker container as shown above.

If you’re using VS Code, with the Docker extension installed, you can click on the Docker extension and see our container and its content:

VS Code Docker extension open to see container and content

Click on the Docker icon in the left menu, and apachepinot/pinot:0.12.0. This should take a few seconds to connect to the running container. Now, you can navigate to the files and see what we have under the opt folder.

Step 4:#

Let’s run the components that are essential to running a Pinot cluster. Change directory to the bin folder and list the contents like so:

Running components, directory changed to bin folder and contents listed

In order to start the Pinot cluster, we will need to run the following essential components:

  • Apache ZooKeeper™
  • Controller
  • Broker
  • Server

Start ZooKeeper using the following command:

./pinot-admin.sh StartZookeeper &

pinot-admin.sh is a shell script for starting the various components. The & allows us to continue using the Bash shell. ZooKeeper is responsible for the configuration for the Pinot cluster and needs to be started first.

We can start the remaining components using these commands one at a time:

./pinot-admin.sh StartController &./pinot-admin.sh StartBroker &./pinot-admin.sh StartServer &

The controller controls the cluster health and coordinates with ZooKeeper for configuration and status changes. The broker is responsible for query distribution and result collation, sometimes called Scatter-Gather. Servers manage individual table segments and perform the actual read/writes. To get a better understanding of each component, read this intro to Apache Pinot.

At this time, we should have a running Pinot cluster. We can verify via the Pinot Data Explorer by browsing to localhost:9000. You should see something like this:

Pinot data explorer

What just happened?

Let’s dive in.

We have started the four essential components of Pinot, however, you will note that there is not yet any data in our fresh new instance.

Before we create a table and load data, notice the four navigation menus on the left-hand side. You can look at the cluster status, run queries, inspect ZooKeeper, or launch the Swagger endpoints for the REST API that Pinot supports.

On the cluster, we notice that we have the essentials deployed: controller, broker, and server. Currently, there are no tables and no minions—dispatchable components used for task management—exist, though Notice also that multi-tenancy support is available in the cluster manager.

Step 5:#

Now that we have our Apache Pinot cluster ready, let’s load some data. Of course, before we do that, we have to create a schema. 

Let’s navigate to the folder:

cd /opt/pinot/examples/batch/baseballStats

You will notice that there are the following files listed here:

baseballStats_offline_table_config.json baseballStats_schema.json ingestionJobSpec.yaml sparkIngestionJobSpec.yaml rawdata

From the names, we can see that there is a schema file, a table config file, an ingestion job, and Apache Spark™ ingestion job files as well as a raw data folder.

The content of the schema file contains both metric and dimension like so (abbreviated):

{ "metricFieldSpecs": [ { "dataType": "INT", "name": "playerStint" }, { "dataType": "INT", "name": "baseOnBalls" }, ], "dimensionFieldSpecs": [ { "dataType": "STRING", "name": "playerID" }, …. { "dataType": "STRING", "name": "playerName" } ], "schemaName": "baseballStats"}

To create a schema and table for the baseball stats file, run the following command from the /app/pinot/bin folder:

./pinot-admin.sh AddTable -schemaFile /opt/pinot/examples/batch/baseballStats/baseballStats_schema.json -tableConfigFile /opt/pinot/examples/batch/baseballStats/baseballStats_offline_table_config.json -exec

You should now see the schema and table created:

Apache Pinot tables created

Next, we’ll want to load some data into the table that we created. We have some sample data in the folder rawdata that we can use to load. We will need a YAML file to perform the actual ingestion job and can use the following command to import data:

./pinot-admin.sh LaunchDataIngestionJob -jobSpecFile /opt/pinot/examples/batch/baseballStats/ingestionJobSpec.yaml

If you run into trouble on this step like I did, edit the ingestJobSpec.yaml file using Docker Desktop to change the inputDirURI from relative to absolute path. Then rerun the above command.

Editing the .yaml file with Docker Desktop

You should now be able to see the table has been populated like so:

Apache Pinot table populated

Now, let’s run some queries. From localhost:9000, select the Query Console in the left-hand menu. Then type in some of these queries:

select * from baseballStats limit 10select sum(runs), playerName from baseballStats group by playerName order by sum(runs) desc

You should see results like so:

Apache Pinot query console

And there you have it!

What’s under the hood?#

If you’re curious to go a step further and see what the segments look like and what the actual data on disk looks like, keep reading! In the Tables section of localhost:9000, you can scroll down to find a segment:

Apache Pinot data on disk segment

Clicking on this gives the specifics of the segment:

Segment specifics in Pinot UI

Pinot allows you to easily inspect your segments and tables in one easy-to-use UI. You can find what’s where and keep an eye on size, location, number of documents, etc.

Conclusion#

Congratulations!

Together, we’ve:

  • Installed and ran Apache Pinot components
  • Created a table schema and a table
  • Loaded data in a table
  • Ran a few queries
  • Explored the Pinot UI

In my next article, we’ll consume event streaming data using Apache Pinot and Apache Kafka®.

In the meantime, run more queries, load more data, and don’t forget to join the Community Slack for support if you get stuck!