This document contains steps that I have followed to setup inbound endpoint with Kafka Avro messages.
Setting up Kafka server and UI
First download theKafka kafka_2.11–2.2.1 for Zookeeper and Kafka server. https://archive.apache.org/dist/kafka/2.2.1/kafka_2.11-2.2.1.tgz
Start Zookeeper with kafka_2.11–2.2.1. Go to the kafka home folder and execute:
bin/zookeeper-server-start.sh config/zookeeper.properties`
The same way, start Kafka with :
bin/kafka-server-start.sh config/server.properties
Download confluent-5.2.1 for Schema registry https://packages.confluent.io/archive/5.2/confluent-community-5.2.1-2.11.tar.gz
Start Schema registry with following command on the confluent home folder:
bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
As the UI for Kafka I used https://github.com/provectus/kafka-ui. Optionaly you can use it to confirm messages are publishing correctly. You can use prebuilt jar file and start the UI with following command
java -Dspring.config.additional-location=application-local.yml -jar kafka-ui-0.3.3.jar
You can access the UI with http://localhost:8080/ . There, you can check topics, messages, consumers and modify or create them accordingly.
Download kafka-ui jar from https://github.com/provectus/kafka-ui/releases/download/0.3.3/kafka-ui-0.3.3.jar
Use the following application-local.yml file:
kafka:
clusters:
- name: local
bootstrapServers: localhost:9092
zookeeper: localhost:2181
schemaRegistry: http://localhost:8081
ksqldbServer: http://localhost:8088
kafkaConnect:
- name: first
address: http://localhost:8083
jmxPort: 9997
spring:
jmx:
enabled: true
auth:
type: DISABLED
Building MI Kafka Inbound endpoint
Copy following jar files from <Kafka Home>/lib to the <MI Home>/lib folder:
- kafka_2.11–2.2.1.jar
- kafka-clients-2.2.1.jar
- metrics-core-2.2.0.jar
- scala-library-2.11.12.jar
- zkclient-0.11.jar
- zookeeper-3.4.13.jar
Copy following jars from maven repository to the <MI Home>/lib
- jackson-core-asl-1.9.13.jar
- jackson-mapper-asl-1.9.13
- common-config-5.4.0.jar
- common-utils-5.4.0.jar
- kafka-avro-serializer-5.3.0.jar
- kafka-schema-registry-client-5.3.0.jar
- avro-1.8.1.jar
Download Kafka inbound endpoint from WSO2 connector store https://store.wso2.com/store/assets/esbconnector/details/b15e9612-5144-4c97-a3f0-179ea583be88 (Download inbound endpoint, not the connector) and copy it inside MI/lib.
Copy the following inbound endpoint into the <MI Home>/repository/deployment/server/synapse-configs/defaults/inbound-endpoints/
<?xml version="1.0" encoding="UTF-8"?>
<inboundEndpoint name="KAFKAListenerEP" sequence="kafka_process_seq" onError="fault" class="org.wso2.carbon.inbound.kafka.KafkaMessageConsumer" suspend="false" xmlns="http://ws.apache.org/ns/synapse">
<parameters>
<parameter name="sequential">true</parameter>
<parameter name="interval">10</parameter>
<parameter name="coordination">true</parameter>
<parameter name="inbound.behavior">polling</parameter>
<parameter name="value.deserializer">io.confluent.kafka.serializers.KafkaAvroDeserializer</parameter>
<parameter name="topic.name">avro-stream</parameter>
<parameter name="poll.timeout">100</parameter>
<parameter name="bootstrap.servers">localhost:9092</parameter>
<parameter name="group.id">hello</parameter>
<parameter name="contentType">text/plain</parameter>
<parameter name="key.deserializer">io.confluent.kafka.serializers.KafkaAvroDeserializer</parameter>
<parameter name="schema.registry.url">http://localhost:8081/</parameter>
</parameters>
</inboundEndpoint>
Testing Inbound endpoint
Create a new topic in Kafka by executing the following in <Kafka home> folder:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 10 --topic avro-stream --config retention.ms=-1
Lets use prebuilt client to generate Avro messages and publish them on the Kafka topic. Clone the https://github.com/datastax/kafka-examples repo go to the producers folder. Execute following command to generate avro messages:
mvn clean compile exec:java -Dexec.mainClass=avro.AvroProducer
In MI, You can see that MI consume Avro messages in print it on console.
Python full stack developer course in hyderabad
ReplyDeleteGreat overview! Using Apache Kafka with WSO2 Micro Integrator is a powerful combo for real-time data integration. Helpful guide for streamlining event-driven architectures!
ReplyDelete<a href="https://digitalfloats.com/graphic-design-course-in-
hyderabad/">[Short Blog] Apache Kafka consumer with WSO2 Micro integrator;/a>
https://digitalfloats.com/graphic-design-course-in-hyderabad/