How to use Nussknacker together with Kafka & Schema Registry — IoT example

The Nussknacker Blog
8 min readMay 20, 2024

--

by Piotr Rudnicki

In today’s fast-paced digital landscape, real-time data processing has become a cornerstone of modern business operations. Apache Kafka is a leading distributed streaming platform that empowers organizations to manage, process, and analyze massive volumes of data in real time with unparalleled efficiency.

As Kafka continues to solidify its position as the backbone of countless data-driven initiatives, the demand for seamless integration solutions has never been greater. Nussknacker is a powerful low-code tool designed to streamline the integration process and unlock the full potential of Kafka’s capabilities.

This article will explore how Nussknacker effortlessly integrates with Apache Kafka, including its seamless compatibility with Kafka’s Schema Registry. Whether you’re a seasoned Kafka enthusiast or just starting to explore the world of real-time data processing, you’ll discover how Nussknacker simplifies the integration journey, enabling you to harness the full power of Kafka with minimal effort.

Join us on this journey to unlock the true potential of Kafka integration with the power of Nussknacker. Please stay tuned as we illustrate these concepts through an exciting Internet of Things example, showcasing the real-world application of Kafka integration with Nussknacker.

Background recap

An Avro schema defines the structure of data in Avro format, specifying the fields and their data types within a record. It’s expressed in JSON format and enables efficient serialization and deserialization of data in binary form, reducing the size of messages transmitted over the network. This results in reduced network bandwidth usage and improved overall performance, especially in high-throughput Kafka deployments where efficient data transfer is critical. This can be summarized in the following diagram:

Fun fact: The original Apache Avro logo was from the defunct British aircraft manufacturer Avro.

Schema registry is a centralized repository for managing Avro schemas in distributed systems such as Apache Kafka. It stores, retrieves, and manages versioned Avro schemas by providing a unified interface for schema registration. The schema registry enables schema evolution, ensuring backward and forward compatibility between different versions of schemas. By enforcing schema compatibility checks, the schema registry improves data interoperability and reliability in streaming data pipelines. Let us emphasize that schema registry is a general concept (specification). In what follows, I will focus on a specific implementation which is Confluent’s solution: Schema Registry. Keep in mind, however, that the configuration of Kafka: “schema.registry.url” accepts any implementation of the schema registry API specification.

The flow can be summarized as follows:

  1. First, the Avro schema used to encode messages is registered in the Schema Registry.
  2. The producer creates Avro-encoded messages and sends them to the Kafka broker along with the registered schema ID. This schema ID is typically sent as part of the message header or message metadata.
  3. The Kafka broker stores the Avro-encoded message along with the associated schema ID in the specified topic.
  4. The consumer reads the Avro-encoded message along with the associated schema ID from the Kafka topic.
  5. The consumer directly calls the schema registry API to retrieve the schema corresponding to the schema ID extracted from the message.
  6. The schema registry responds to the consumer’s request by providing the schema associated with the schema ID.
  7. Using the retrieved schema, the consumer deserializes the Avro-encoded message back into a human-friendly format (e.g. JSON).

What about JSON schemas? If you are not concerned about data compression on your topic and would prefer to use JSON schema instead, Nussknacker will meet your needs.
The integrated AKHQ panel allows you to create JSON schemas as well!

General Kafka & Schema Registry configuration in Nussknacker

Here is an example configuration which should shed some light on the Kafka configuration process in Nussknacker:

{
"bootstrap.servers": "kafka.some.domain-1:9092,kafka.some.domain-2:9092,kafka.some.domain-3:9092",
"security.protocol": "SASL_SSL",
"sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username = \"username\" password=\"password\";",
"sasl.mechanism": "PLAIN",
"ssl.truststore.type": "PEM",
"ssl.truststore.location": "/certs/client.truststore",

"schema.registry.url": "https://some.schema.registry.host:8080",
"basic.auth.credentials.source": "USER_INFO",
"basic.auth.user.info": "usernameToSR:passwordToSR"
}

In the rest of this article, I’ll assume that Nussknacker is configured with a Kafka cluster connected to Confluent’s Schema Registry. I’ll also use AKHQ — a popular GUI for Apache Kafka — to interact with Kafka and Schema Registry.

Registering schemas in Schema Registry

Registering a schema using AKHQ is fairly easy:

Let’s also define the schema for our sink topic iot.data.anomalies. At this moment they would have the same schema but it is good practice to separate schemas in order to let them evolve independently:

One can also register schema programmatically using Schema Registry API as follows:

POST /subjects/test/versions HTTP/1.1
Host: schemaregistry.example.com
Accept: application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json
Content-Type: application/json

{
"schema": "{
\"type\": \"record\",
\"name\": \"IoTData\",
\"fields\": [
{
\"name\": \"device_id\",
\"type\": \"string\"
},
{
\"name\": \"timestamp\",
\"type\": \"string\"
},
{
\"name\": \"temperature\",
\"type\": \"float\"
},
{
\"name\": \"humidity\",
\"type\": \"int\"
},
{
\"name\": \"location\",
\"type\": {
\"type\": \"record\",
\"name\": \"Location\",
\"fields\": [
{
\"name\": \"latitude\",
\"type\": \"float\"
},
{
\"name\": \"longitude\",
\"type\": \"float\"
}
]
}
}
]
}",
"schemaType": "AVRO",
"references": []
}

In either way the response should be the schema ID (in a success case).

Creating topic in Kafka

We also need two topics: iot.data and iot.data.anomalies. Creating these is also quite easy using AKHQ

The number of partitions and Replication Factor should be adjusted to your needs related to fault tolerance, reliability, load distribution etc.

You can also create the topic programmatically:

bin/kafka-topics.sh --bootstrap-server --create --topic iot.data --partitions 1 --replication-factor 1

Creating scenario in Nussknacker

Below is a scenario that reads data from sensor devices, registers temperature for each measurement, filters if the temperature is high (greater than or equal to 50C) and if it exceeds this threshold, sends this data to the anomalies topic.

Here we see the result of configuring Nussknacker with Schema Registry. It seamlessly fetched the schema version from the configured registry! Thanks to this, we can just focus on pure data transformations, allowing us to prioritize delivering business value without the added concern of managing the Avro (de)serialization process.

Producing data to Kafka

Now let’s write two sensor messages to our Kafka topic. One of them will report a low temperature (25.6) and the other will report a high temperature (52.6). We expect to see only the second one on the target topic.

As expected, only the message with the high temperature appears on the target topic.

Schema Evolution

Suppose there is a new business requirement that we should add information to the target topic that indicates whether the anomaly was caused by high temperature, humidity, or both. Here are the exact steps describing how to make this change:

  • Add new field “anomaly_reason” to the iot.data.anomaly-value schema
  • If temperature was greater equal than 50 and humidity was less than 20 then we should set “anomaly_reason” to “T” (indicating that only temperature was the reason)
  • If temperature was less than 50 and humidity was greater equal than 20 then we should set “anomaly_reason” to “H” (indicating that only humidity was the reason)
  • If temperature was greater equal than 50 and humidity was greater equal than 20 then we should set “anomaly_reason” to “T,H”

Having such precisely written business requirements (many of whom can only dream of) we may proceed to work.

First we update the iot.data.anomalies-value schema. Since this schema was created to be backwards compatible, we need to add a nullable string field and set the default value to `null`. This is important because in our deployment scenario we are using the latest schema version for the corresponding Kafka sink.

Let us look at our scenario after this update. We see that the Kafka sink automatically updated the “anomaly_reason” field and set the correct default value.

The next step is to replace our simple filter component with the choice component. Notice how easy it is to translate our business requirements into component logic!

In the end, our scenario looks like in the image below. We point to three disjoint sinks, each setting a different value of “anomaly_reason”.

Producing to Kafka after Schema evolution

After deploying the new version of the scenario, we will send a new message to Kafka with a message about high temperature and humidity. We expect to see this message in the source topic with the new field “anomaly_reason” being “T,H”.

And this time, Nussknacker didn’t disappoint! On the target topic we see exactly what we’d expected.

Excited to explore this scenario? Great news awaits! Simply click this link to access a running instance of our scenario.

SEE DEMO

Summary

Kafka & Avro have become the de facto standard in the real time data streaming landscape. Nussknacker provides a seamless solution for integrating with Kafka topics and their corresponding Avro schemas. By providing an easy-to-use interface and abstracting away complexity, Nussknacker allows developers to focus on business logic while ensuring data integrity and compatibility.

Piotr Rudnicki

--

--