As explained in the above diagram, rule creator (Desktop) will create JSON based rule and push them to Kafka (rule topic). Event Source will send events to Kafka (testin topic). Finally Flink will consume both rules and events as streams and process rules based on key (Driver Id). Rules will be stored in Flink as in-memory collection and the rules also can be updated in same manner. Finally out put result will be send to Kafka (testout topic).
Setup Flink
- Download Apache Flink 1.6.3 from below location and extract archive file.
https://www.apache.org/dyn/closer.lua/flink/flink-1.6.3/flink-1.6.3-bin-scala_2.11.tgz
- Download below dependancy jar files and place in flink-1.6.3/lib folder.
- Configure Flink with flink-1.6.3/conf/flink-conf.yaml .
Change job-manager and task-manager heap size to much smaller size.
jobmanager.heap.size: 1g
taskmanager.heap.size: 2g
taskmanager.numberOfTaskSlots: 20
- Change State Backend to RocksDB
Create folder from you home location
~$ mkdir -p data/flink/checkpoints
Edit configuration
state.backend: rocksdb
state.checkpoints.dir: file:///home/dhanuka/data/flink/checkpoints
- Start Flink cluster in standalone mode.
:~/software/flink-1.6.3$ ./bin/start-cluster.sh
Setup Kafka & Zookeeper
- In here I am using docker and docker compose so you can follow below blogspost to install docker & docker-compose in ubuntu.
http://dhanuka84.blogspot.com/2019/02/install-docker-in-ubuntu.html
- Please checkout below docker project from github.
https://github.com/dhanuka84/my-docker.git
- Change IP address to your machine IP address.
https://github.com/dhanuka84/my-docker/blob/master/kafka/kafka-hazelcast.yml
- Bootup Kafka and Zookeeper with docker-compose
- Check docker containers
- Create Kafka Topics
Download Confluent Platform - https://www.confluent.io/download/
- Got to confluent platform extracted location and run below commands
bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 6 --topic testin
bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 6 --topic testout
bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 6 --topic rule
Create Java based Rule Job
- Checkout the project from github and build the project with maven.
https://github.com/dhanuka84/stream-analytics
stream-analytics$ mvn clean install -Dmaven.test.skip=true
Please note that both rules and events filter by key.
KeyedStream<Event, String> filteredEvents = events.keyBy(Event::getDriverID);
rules.broadcast().keyBy(Rule::getDriverID)
Store rules in ListState using flatMap1 method.
Execute rule against each relevant event using flatMap2 method.
Finally results transform to JSON string.
- Prepare flat jar to upload, using below command within checked out project home.
stream-analytics$ jar uf target/stream-analytics-0.0.1-SNAPSHOT.jar application.properties consumer.properties producer.properties
- Upload Flink Job Jar file
copy stream-analytics-0.0.1-SNAPSHOT.jar Flink home folder and run below command
Testing
- Run Producer.java class using your favorite IDE. This will generate both events and rules then publish to Kafka.
- To verify from Kafka level use below commands within confluent home location.
Get offset
bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic testout --time -1
Read from Kafka topic
bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic testout