Kafka, Avro & Schema Registry
🗓️ Last updated on May 13, 2024 | 11 | Improve this pageOverview
This guide shows you how to use Microcks for mocking and testing Avro encoding on top of Apache Kafka . You’ll see how Microcks can speed-up the sharing of Avro schema to consumers using a Schema Registry and we will check how Microcks can detect drifts between expected Avro format and the one really used.
Microcks supports Avro as an encoding format for mocking and testing asynchronous and event-driven APIs through AsyncAPI . When it comes to serializing Avro data to a Kafka topic, you usually have 2 options :
- The “old-fashioned one” that is about putting raw Avro binary representation of the message payload,
- The “modern one” that is about putting the Schema ID + the Avro binary representation of the message payload (see Schema Registry: A quick introduction ).
This guide presents the 2 options that we will call RAW
or REGISTRY
. Microcks is by default configured to manage the RAW
options so that it does not require any external dependency to get you starting. If you want to stick with this option, we first step below is obviously optional.
1. Setup Schema Registry
Microcks has been successfully tested with both Confluent Schema Registry and Apicurio Registry . Both can be deployed as containerized workload on your Kubernetes cluster. Microcks does not provide any installation scripts or procedures ; please refer to projects or related products documentation.
When connected to a Schema Registry, Microcks is pushing the Avro Schema to the registry at the same time it is pushing Avro encoded mock messages to the Kafka topic. That way, Event consumers may retrieve Avro Schema from the registry to be able to deserialize messages.
If you have used the Operator based installation
of Microcks, you’ll need to add some extra properties to your MicrocksInstall
custom resource. The fragment below shows the important ones:
apiVersion: microcks.github.io/v1alpha1
kind: MicrocksInstall
metadata:
name: microcks
spec:
[...]
features:
async:
enabled: true
[...]
defaultAvroEncoding: REGISTRY
kafka:
[...]
schemaRegistry:
url: https://schema-registry.apps.example.com
confluent: true
username: microcks
credentialsSource: USER_INFO
The important things to notice are:
defaultAvroEncoding
should be set toREGISTRY
(this is indeed a workaround until AsyncAPI adds support for specifying the serialization details at the Binding level. See this issue for more.)schemaRegistry
block should now be specified with correcturl
. Theconfluent
mode allows to tell Microcks that the registry is the Confluent one OR to turn on the Confluent compatibility mode if you’re using an Apicurio Registry .username
andcreadentialsSource
are only used if using a secured Confluent registry.
If you have used the Helm Chart based installation
of Microcks, this is the corresponding fragment put in a Values.yml
file:
[...]
features:
async:
enabled: true
[...]
defaultAvroEncoding: REGISTRY
kafka:
[...]
schemaRegistry:
url: https://schema-registry.apps.example.com
confluent: true
username: microcks
credentialsSource: USER_INFO
Actual connection to the Schema Registry will only be made once Microcks will send Avro messages to Kafka. Let see below how to use Avro encoding with AsyncAPI.
2. Use Avro in AsyncAPI
AsyncAPI allows to reference Avro schema used for serializing / deserializing messages on a Kafka topic. The flexible notation of AsyncAPI allow to do that in 3 different ways:
- Using the embedded notation: that means that Avro schema is defined inline within the message
payload
property, - Using remote reference: that means that schema is specified using absolute remote endpoint like
$ref: 'https://schemas.example.com/user'
within the messagepayload
property, - Using local reference: that means that schema is specified using relative reference like
$ref: './user-signedup.avsc#/User'
within the messagepayload
property.
Here is below a fragment of AsyncAPI specification file that shows the important things to notice when planning to use Avro and Microcks with AsyncAPI. It comes for one sample you can find on our GitHub repository .
asyncapi: '2.0.0'
id: 'urn:io.microcks.example.user-signedup'
[...]
channels:
user/signedup:
[...]
subscribe:
[...]
contentType: avro/binary
schemaFormat: application/vnd.apache.avro+json;version=1.9.0
payload:
$ref: './user-signedup.avsc#/User'
You’ll notice that it is of importance that contentType
and schemaFormat
property should be defined according to the Avro format. In this GitHub repository same folder, you’ll also find the user-signedup.avsc
file defining the User
record type like below:
{
"namespace": "microcks.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "fullName", "type": "string"},
{"name": "email", "type": "string"},
{"name": "age", "type": "int"}
]
}
As we use references, our full specification is now spanning multiple files so you’ll not be able to simply upload one file for API import into Microcks. You will have to define a full Importer Job as described here . During the import of the AsyncAPI contract file within Microcks, local references will be resolved and files downloaded and integrated within Microcks own repository. The capture below illustrates in the Contracts section that there are now two files: an AsyncAPI and an Avro schema one.
Finally, as Microcks internal mechanics are based on examples, you will also have to attach examples to your AsyncAPI specification. But: how to specify examples for a binary encoding such as Avro? No problem! Simply use JSON or YAML as illustrated in the fragment below, still coming from our GitHub repository .
asyncapi: '2.0.0'
id: 'urn:io.microcks.example.user-signedup'
[...]
channels:
user/signedup:
[...]
subscribe:
[...]
examples:
- laurent:
payload: |-
{"fullName": "Laurent Broudoux", "email": "laurent@microcks.io", "age": 41}
- john:
payload:
fullName: John Doe
email: john@microcks.io
age: 36
3. Validate your mocks
Now it’s time to validate that mock publication of Avro messages is correct.
With Schema Registry
When using the REGISTRY
encoding options with a deployed Schema Registry, things are pretty simple as you can interact with registry either from GUI or CLI. Let’s check that Microcks has correctly published the schema for our sample topic. See below the results we have with our sample:
$ curl https://schema-registry.apps.example.com -s -k | jq .
[
"UsersignedupAvroAPI_0.1.2_user-signedup-microcks.avro.User"
]
$ curl https://schema-registry.apps.example.com/subjects/UsersignedupAvroAPI_0.1.2_user-signedup-microcks.avro.User/versions -s -k | jq .
[
1
]
$ curl https://schema-registry.apps.example.com/subjects/UsersignedupAvroAPI_0.1.2_user-signedup-microcks.avro.User/versions/1 -s -k | jq .
{
"subject": "UsersignedupAvroAPI_0.1.2_user-signedup-microcks.avro.User",
"version": 1,
"id": 1,
"schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"microcks.avro\",\"fields\":[{\"name\":\"fullName\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"},{\"name\":\"age\",\"type\":\"int\"}]}"
}
Very nice! We can also use the kafkacat
CLI tool
to ensure that a topic consumer will be able to deserialize messages using the schema stored into registry.
$ kafkacat -b microcks-kafka-bootstrap-microcks.apps.example.com:9092 -t UsersignedupAvroAPI_0.1.2_user-signedup -s value=avro -r https://schema-registry.apps.example.com -o end
% Auto-selecting Consumer mode (use -P or -C to override)
% Reached end of topic UsersignedupAvroAPI_0.1.2_user-signedup [0] at offset 114
{"fullName": "Laurent Broudoux", "email": "laurent@microcks.io", "age": 41}
{"fullName": "John Doe", "email": "john@microcks.io", "age": 36}
% Reached end of topic UsersignedupAvroAPI_0.1.2_user-signedup [0] at offset 116
🎉 Super!
Without Schema Registry
Without Schema Registry, things may be more complicated as you have to develop a consuming script or application that should have the Avro Schema locally available to be able to deserialize the message content.
For our User signedup Avro API
sample, we have such a consumer in one GitHub repository
.
Follow the following steps to retrieve it, install dependencies and check the Microcks mocks:
$ git clone https://github.com/microcks/api-tooling.git
$ cd api-tooling/async-clients/kafkajs-client
$ npm install
$ node avro-consumer.js microcks-kafka-bootstrap-microcks.apps.example.com:9092 UsersignedupAvroAPI_0.1.2_user-signedup
Connecting to microcks-kafka-bootstrap-microcks.apps.example.com:9092 on topic UsersignedupAvroAPI_0.1.2_user-signedup
{"level":"INFO","timestamp":"2021-02-11T20:30:48.672Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"kafkajs-client"}
{"level":"INFO","timestamp":"2021-02-11T20:30:48.708Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"kafkajs-client","memberId":"my-app-7feb2099-1701-4a8a-9eff-50aeed60d65d","leaderId":"my-app-7feb2099-1701-4a8a-9eff-50aeed60d65d","isLeader":true,"memberAssignment":{"UsersignedupAvroAPI_0.1.2_user-signedup":[0]},"groupProtocol":"RoundRobinAssigner","duration":36}
{
"fullName": "Laurent Broudoux",
"email": "laurent@microcks.io",
"age": 41
}
{
"fullName": "John Doe",
"email": "john@microcks.io",
"age": 36
}
Note: this simple
avro-consumer.js
script is also able to handle TLS connections to your Kafka broker. It was omitted here for sake of simplicity but you can put the name of the CRT file as the 3rd argument of the command.
4. Run AsyncAPI tests
Now the last step for being fully accustomed to Avro on Kafka support in Microcks is to perform some tests. As we will need API implementation for that it’s not as easy as writing HTTP based API implementation, we have some helpful scripts in our api-tooling
GitHub repository. This scripts are made for working with the User signedup Avro API
sample we used so far but feel free to adapt them for your own use.
So the first thing for this section, will be to retrieve the scripts and install dependencies if you have not already do that in previous section. Follow below instructions:
$ git clone https://github.com/microcks/api-tooling.git
$ cd api-tooling/async-clients/kafkajs-client
$ npm install
With Schema Registry
When using a Schema Registry with the REGISTRY
encoding configured into Microcks, the following schema illustrates Microcks interactions with broker and registry. Here, we are not necessarily using the broker and registry Microcks is using for mocking but we are able to reuse any Kafka broker and any Schema Registry available within your organization - typically this will depend on the environment you want to launch tests upon.
That said, imagine that you want to validate messages from a QA environment with dedicated broker and registry. Start by using our utility script to produce some messages on an user-registration
arbitrary topic. This script is using a local Avro schema to do the binary encoding and it is also publishing this schema into the connected QA Schema Registry:
$ node avro-with-registry-producer.js kafka-broker-qa.apps.example.com:9092 user-registration https://schema-registry-qa.apps.example.com
Connecting to kafka-broker-qa.apps.example.com:9092 on topic user-registration, using registry https://schema-registry-qa.apps.example.com
{"level":"ERROR","timestamp":"2021-02-11T21:07:09.962Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 5)","broker":"kafka-broker-qa.apps.example.com:9092","clientId":"my-app","error":"There is no leader for this topic-partition as we are in the middle of a leadership election","correlationId":1,"size":108}
[
{
topicName: 'user-registration',
partition: 0,
errorCode: 0,
baseOffset: '0',
logAppendTime: '-1',
logStartOffset: '0'
}
]
[...]
Do not interrupt the execution of the script and go create a New Test within Microcks web console. Use the following elements in the Test form:
- Test Endpoint:
kafka://kafka-broker-qa.apps.example.com:9092/user-registration?registryUrl=https://schema-registry-qa.apps.example.com
and note this newregistryUrl
parameter to tell Microcks where to get the Avro schema used for writing 😉, - Runner:
ASYNC API SCHEMA
for validating against the AsyncAPI specification of the API.
Whilst Test Endpoint and Schema Registry may be secured with custom TLS certificates or username/password, we skipped this from this guide for seek of simplicity but Microcks is handling this through Secrets or additional
registryUsername
andregistryCredentialsSource
parameters .
Launch the test and wait for some seconds and you should get access to the test results as illustrated below:
This is fine and we can see that the type is avro/binary
and the message content is nicely displayed using JSON but what in case of a failure? What are we able to demonstrate using Microcks validation? Next to the script lies actually two Avro schemas:
user-signedup.avsc
is correct and matches the one that is referenced into the AsyncAPI specification,user-signedup-bad.avsc
represented an evolution and does not match the one from the AsyncAPI specification.
Well let see now if we tweak a little bit the avro-with-registry-producer.js
script… Open it in your favorite editor to put comments on lines 48 and 56 and to remove comments on lines 45 and 55. Relaunch it and relaunch a new test…
🎉 We can see that there’s now a failure and that’s perfect! What does that mean? It means that when your application is using a different and incompatible schema from the one in the AsyncAPI contract, Microcks raises an error and spot the breaking change! The fullName
required property was expected as stated in the AsyncAPI file but cannot be found in incoming message… thus your tested application producing message is sending garbage indeed 😉
Without Schema Registry
Now looking at the RAW
encoding option and what we can deduce from tests. To simulate an existing application, we will now use the avro-producer.js
script that is also using the local user-signedup.avsc
Avro schema to do the binary encoding:
$ node avro-producer.js kafka-broker-qa.apps.example.com:9092 user-registration
Connecting to kafka-broker-qa.apps.example.com:9092 on topic user-registration
{"level":"ERROR","timestamp":"2021-02-11T21:37:28.266Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 5)","broker":"kafka-broker-qa.apps.example.com:9092","clientId":"my-app","error":"There is no leader for this topic-partition as we are in the middle of a leadership election","correlationId":1,"size":96}
[
{
topicName: 'user-registration',
partition: 0,
errorCode: 0,
baseOffset: '0',
logAppendTime: '-1',
logStartOffset: '0'
}
]
[...]
Do not interrupt the execution of the script and go create a New Test within Microcks web console. Use the following elements in the Test form:
- Test Endpoint:
kafka://kafka-broker-qa.apps.example.com:9092/user-registration
simply, - Runner:
ASYNC API SCHEMA
for validating against the AsyncAPI specification of the API.
Launch the test and wait for some seconds and you should get access to the test results as illustrated below:
You can see here that we just have the string representation of the binary message that was sent. Using RAW
encoding we cannot be sure that what we read has any sense regarding the semantic meaning of the API contract.
If you want to play with this idea, start making change to the Avro schema used by the producer and add more properties of different types. As the schema referenced with the AsyncAPI contract is very basic we’ll always be able to read.
But start removing properties and just send single bytes, you’ll see validation failure happened. In RAW
mode, validation is very shallow: we cannot detect schema incompatibilities as we do not have the schema used for writing. So Microcks can just check, the binary Avro we can read with given schema and as long as you send more bytes than expected: it works 😞
Wrap-Up
In this guide we have seen how Microcks can also be used to simulate Avro messages on top of Kafka. We have also checked how it can connect to Schema Registry such as the one from Confluent in order to speed-up and make reliable the process of propagating Avro schema updates to API events consumers. We finally ended up demonstrating how Microcks can be used to detect any drifting issues between expected Avro schema and the one effectively used by real-life producers.
Take care: Microcks will detect if they send garbage! 🗑
Still Didn’t Find Your Answer?
Join our community and get the help you need. Engage with other members, ask questions, and share knowledge to resolve your queries and expand your understanding.
Join the community