3/25/2017

Kafka Connect Architecture




  • Please note here I am using confluent platform. So please go to confluent installation directory and run below kafka related commands. For example mine:

/home/xxx/software/confluent-3.0.0



Steps to follow:


1. Start ElasticSearch

$ cd ~/software/elasticsearch-2.3.4/

./bin/elasticsearch

2. Start Zookeeper
nohup ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties &

3. Start Kafka
nohup ./bin/kafka-server-start ./etc/kafka/server.properties > kafka.log 2>&1 &

4. Start Standalone Source connector
nohup ./bin/connect-standalone etc/kafka/connect-standalone.properties etc/kafka/connect-socket-source.properties  > socket.log 2>&1 &
5. Start Distributed Sink connector
nohup ./bin/connect-distributed etc/kafka/connect-distributed.properties  > connect.log 2>&1 &
6. Create topics names event_topic and metric_topic
bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic event_topic

bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic metric_topic

7. List down created topics
bin/kafka-topics --list --zookeeper localhost:2181

8. Start console consumer to verify source connector workflow
./bin/kafka-console-consumer  --zookeeper localhost:2181 --from-beginning --topic event_topic

9. Create index called “event_topic” in ElasticSearch
$ curl -XPUT 'http://localhost:9200/event_topic/' -d '{
   "settings" : {
       "number_of_shards" : 3,
       "number_of_replicas" : 2
   }
}'
10. Just in case if you want to delete topics

bin/kafka-topics --delete --zookeeper localhost:2181 --topic EVENT_TOPIC

bin/kafka-topics --delete --zookeeper localhost:2181 --topic METRIC_TOPIC
11. Create sink connector through rest service

curl -X POST -H "Content-Type: application/json" --data '{"name": "elasticsearch-sink", "config": {"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max":"2","batch.size":1,"key.ignore":"true","connection.url":"http://localhost:9200","topics":"socket-test,event_topic,metric_topic","schema.ignore":"true","topic.index.map":"event_topic:event_topic,metric_topic:metric_topic","linger.ms":1000,"topic.schema.ignore":"event_topic,metric_topic" }}’ http://localhost:8084/connectors
12. Reconfigure sink connector on the fly.



curl -X PUT -H "Content-Type: application/json" --data '{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max":"2","batch.size":1,"key.ignore":"true","connection.url":"http://localhost:9200","topics":"socket-test,event_topic,metric_topic","schema.ignore":"true","topic.index.map":"event_topic:event_topic,metric_topic:metric_topic","linger.ms":1000,"topic.schema.ignore":"event_topic,metric_topic","type.name":"kafka-connect" }’ http://localhost:8085/connectors/elasticsearch-sink/config

13. Create Source connector


curl -X POST -H "Content-Type: application/json" --data '{"name": "socket-connector", "config": {"connector.class":"org.apache.kafka.connect.socket.SocketSourceConnector", "tasks.max":"4", "topic":"socket-test", "schema.name":"socketschema","type.name":"kafka-connect","schema.ignore":"true", "port":"12345", "batch.size":"2" }}' http://localhost:8084/connectors

14. Reconfigure Source connector on the fly


curl -X PUT -H "Content-Type: application/json" --data '{"connector.class":"org.apache.kafka.connect.socket.SocketSourceConnector",
"tasks.max":"4", "topics":"socket-test",
"schema.name":"socketschema",
"type.name":"kafka-connect",
"schema.ignore":"true",
"tcp.port":"12345",
"batch.size":"2",
"metrics_id":"domain",
"metrics_domains":"",
"events_domains":"",
"domain_topic_mapping":"event_sum:event_topic,metric_rum:metric_topic",
"error_topic":"error_topic"

15. Send a sample json message

cat samplejson | netcat localhost 12345

16. Verify json message from ElasticSearch



17. Kafka Connect configuration

Connect-distributed.properties

##
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##

# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092

# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-cluster

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

#key.converter=io.confluent.connect.avro.AvroConverter
#value.converter=io.confluent.connect.avro.AvroConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false

# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Topic to use for storing offsets. This topic should have many partitions and be replicated.
offset.storage.topic=connect-offsets

# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated topic.
# You may need to manually create the topic to ensure single partition for the config topic as auto created topics may have multiple partitions.
config.storage.topic=connect-configs

# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated.
status.storage.topic=connect-status

# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
rest.port=8085



Connect-standalone.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
#key.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

#key.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false

# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
rest.port=8084
 

3/21/2017

Master Data Management & Application Self Load Balancing



GREEN ARROWS :  From the Admin panel, user performs Create, Update, and Delete, or CRUD operations using Rest service provided. In here we are using JSON as payload format.


RED ARROWS : Once the application receives an admin request, the application makes requests to do write operations.Those changes are first applied to L1 cache, then L2 cache, and finally into MySQL DB.
BLUE ARROWS:  Once the application receives an admin request, the application makes requests to do read operations. The application will search the L1 cache first and then move on to L2 and finally mySQL directly if it cannot be found in each subsequent location..

YELLOW ARROWS: Application even can ignore L1 and L2 cache and directly call MySQL DB.

BLACK ARROWS : Cluster communication.

  • Please note that applications can work without L2 cache as well.


Application Load Balancing

Hazelcast is used for application clustering and we are using a distributed map to share productId information among the applications which are run inside the cluster. That way, each application knows what product Ids that need to be processed.

Ex: 100 Products process within two nodes application cluster.

Each node processes 50 products and those 50 products also can be processed in parallel within each node.

Source Code:
https://github.com/dhanuka84/distributed-applications



Fast Stream Analytics with Kafka & Flink



Steps:

1.Setup Kafka Connect & Kafka


2. Download Flink & Install


3. Go to flink folder : cd flink-1.2.0/

4. Start flink in standalone mode
./bin/start-local.sh

5. Start Kafka Read Job

./bin/flink run ~/flink/flink-examples/flink-examples-streaming/target/Kafka.jar --topic metric_topic --bootstrap.servers localhost:9092

6. Watch logs
tail -f log/flink-*-jobmanager-*.out

7. Send JSON to Kafka Connect TCP port.

cat analytics.json | netcat localhost 12345



Source Code: