- Please note here I am using confluent platform. So please go to confluent installation directory and run below kafka related commands. For example mine:
/home/xxx/software/confluent-3.0.0
- Source Code
Steps to follow:
1. Start ElasticSearch
$ cd ~/software/elasticsearch-2.3.4/
./bin/elasticsearch
2. Start Zookeeper
nohup ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties &
3. Start Kafka
nohup ./bin/kafka-server-start ./etc/kafka/server.properties > kafka.log 2>&1 &
4. Start Standalone Source connector
nohup ./bin/connect-standalone etc/kafka/connect-standalone.properties etc/kafka/connect-socket-source.properties > socket.log 2>&1 &
5. Start Distributed Sink connector
nohup ./bin/connect-distributed etc/kafka/connect-distributed.properties > connect.log 2>&1 &
6. Create topics names event_topic and metric_topic
bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic event_topic
bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic metric_topic
7. List down created topics
bin/kafka-topics --list --zookeeper localhost:2181
8. Start console consumer to verify source connector workflow
./bin/kafka-console-consumer --zookeeper localhost:2181 --from-beginning --topic event_topic
9. Create index called “event_topic” in ElasticSearch
$ curl -XPUT 'http://localhost:9200/event_topic/' -d '{
"settings" : {
"number_of_shards" : 3,
"number_of_replicas" : 2
}
}'
"settings" : {
"number_of_shards" : 3,
"number_of_replicas" : 2
}
}'
10. Just in case if you want to delete topics
bin/kafka-topics --delete --zookeeper localhost:2181 --topic EVENT_TOPIC
bin/kafka-topics --delete --zookeeper localhost:2181 --topic METRIC_TOPIC
11. Create sink connector through rest service
curl -X POST -H "Content-Type: application/json" --data '{"name": "elasticsearch-sink", "config": {"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max":"2","batch.size":1,"key.ignore":"true","connection.url":"http://localhost:9200","topics":"socket-test,event_topic,metric_topic","schema.ignore":"true","topic.index.map":"event_topic:event_topic,metric_topic:metric_topic","linger.ms":1000,"topic.schema.ignore":"event_topic,metric_topic" }}’ http://localhost:8084/connectors
12. Reconfigure sink connector on the fly.
curl -X PUT -H "Content-Type: application/json" --data '{"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max":"2","batch.size":1,"key.ignore":"true","connection.url":"http://localhost:9200","topics":"socket-test,event_topic,metric_topic","schema.ignore":"true","topic.index.map":"event_topic:event_topic,metric_topic:metric_topic","linger.ms":1000,"topic.schema.ignore":"event_topic,metric_topic","type.name":"kafka-connect" }’ http://localhost:8085/connectors/elasticsearch-sink/config
13. Create Source connector
curl -X POST -H "Content-Type: application/json" --data '{"name": "socket-connector", "config": {"connector.class":"org.apache.kafka.connect.socket.SocketSourceConnector", "tasks.max":"4", "topic":"socket-test", "schema.name":"socketschema","type.name":"kafka-connect","schema.ignore":"true", "port":"12345", "batch.size":"2" }}' http://localhost:8084/connectors
14. Reconfigure Source connector on the fly
curl -X PUT -H "Content-Type: application/json" --data '{"connector.class":"org.apache.kafka.connect.socket.SocketSourceConnector",
"tasks.max":"4", "topics":"socket-test",
"schema.name":"socketschema",
"type.name":"kafka-connect",
"schema.ignore":"true",
"tcp.port":"12345",
"batch.size":"2",
"metrics_id":"domain",
"metrics_domains":"",
"events_domains":"",
"domain_topic_mapping":"event_sum:event_topic,metric_rum:metric_topic",
"error_topic":"error_topic"
15. Send a sample json message
cat samplejson | netcat localhost 12345
16. Verify json message from ElasticSearch
17. Kafka Connect configuration
Connect-distributed.properties
##
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092
# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
group.id=connect-cluster
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
#key.converter=io.confluent.connect.avro.AvroConverter
#value.converter=io.confluent.connect.avro.AvroConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# Topic to use for storing offsets. This topic should have many partitions and be replicated.
offset.storage.topic=connect-offsets
# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated topic.
# You may need to manually create the topic to ensure single partition for the config topic as auto created topics may have multiple partitions.
config.storage.topic=connect-configs
# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated.
status.storage.topic=connect-status
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
rest.port=8085
Connect-standalone.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=localhost:9092
# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
#key.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
#key.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# The internal converter used for offsets and config data is configurable and must be specified, but most users will
# always want to use the built-in default. Offset and config data is never visible outside of Kafka Connect in this format.
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
rest.port=8084