This document contains steps that I have followed to setup inbound endpoint with Kafka Avro messages.
Setting up Kafka server and UI
First download theKafka kafka_2.11–2.2.1 for Zookeeper and Kafka server.
Start Zookeeper with kafka_2.11–2.2.1. Go to the kafka home folder and execute:
bin/ config/`
The same way, start Kafka with :
bin/ config/
Download confluent-5.2.1 for Schema registry
Start Schema registry with following command on the confluent home folder:
bin/schema-registry-start ./etc/schema-registry/
As the UI for Kafka I used Optionaly you can use it to confirm messages are publishing correctly. You can use prebuilt jar file and start the UI with following command
java -Dspring.config.additional-location=application-local.yml -jar kafka-ui-0.3.3.jar
You can access the UI with http://localhost:8080/ . There, you can check topics, messages, consumers and modify or create them accordingly.
Download kafka-ui jar from
Use the following application-local.yml file:
- name: local
bootstrapServers: localhost:9092
zookeeper: localhost:2181
schemaRegistry: http://localhost:8081
ksqldbServer: http://localhost:8088
- name: first
address: http://localhost:8083
jmxPort: 9997
enabled: true
Building MI Kafka Inbound endpoint
Copy following jar files from <Kafka Home>/lib to the <MI Home>/lib folder:
- kafka_2.11–2.2.1.jar
- kafka-clients-2.2.1.jar
- metrics-core-2.2.0.jar
- scala-library-2.11.12.jar
- zkclient-0.11.jar
- zookeeper-3.4.13.jar
Copy following jars from maven repository to the <MI Home>/lib
- jackson-core-asl-1.9.13.jar
- jackson-mapper-asl-1.9.13
- common-config-5.4.0.jar
- common-utils-5.4.0.jar
- kafka-avro-serializer-5.3.0.jar
- kafka-schema-registry-client-5.3.0.jar
- avro-1.8.1.jar
Download Kafka inbound endpoint from WSO2 connector store (Download inbound endpoint, not the connector) and copy it inside MI/lib.
Copy the following inbound endpoint into the <MI Home>/repository/deployment/server/synapse-configs/defaults/inbound-endpoints/
<?xml version="1.0" encoding="UTF-8"?>
<inboundEndpoint name="KAFKAListenerEP" sequence="kafka_process_seq" onError="fault" class="org.wso2.carbon.inbound.kafka.KafkaMessageConsumer" suspend="false" xmlns="">
<parameter name="sequential">true</parameter>
<parameter name="interval">10</parameter>
<parameter name="coordination">true</parameter>
<parameter name="inbound.behavior">polling</parameter>
<parameter name="value.deserializer">io.confluent.kafka.serializers.KafkaAvroDeserializer</parameter>
<parameter name="">avro-stream</parameter>
<parameter name="poll.timeout">100</parameter>
<parameter name="bootstrap.servers">localhost:9092</parameter>
<parameter name="">hello</parameter>
<parameter name="contentType">text/plain</parameter>
<parameter name="key.deserializer">io.confluent.kafka.serializers.KafkaAvroDeserializer</parameter>
<parameter name="schema.registry.url">http://localhost:8081/</parameter>
Testing Inbound endpoint
Create a new topic in Kafka by executing the following in <Kafka home> folder:
bin/ --create --zookeeper localhost:2181 --replication-factor 1 --partitions 10 --topic avro-stream --config
Lets use prebuilt client to generate Avro messages and publish them on the Kafka topic. Clone the repo go to the producers folder. Execute following command to generate avro messages:
mvn clean compile exec:java -Dexec.mainClass=avro.AvroProducer
In MI, You can see that MI consume Avro messages in print it on console.
Python full stack developer course in hyderabad