4/06/2017

Replicate Table Data from MySQL to Kafka






Steps

1. Download and install Confluent 3.2.0

2. cd  ~/software/confluent-3.2.0

3.  Start Zookeeper

nohup ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties &

4. Start Kafka

nohup ./bin/kafka-server-start ./etc/kafka/server.properties > kafka.log 2>&1 &

5. Create & Configure JDBC connector properties

touch etc/kafka-connect-jdbc/source-mysql.properties
vim etc/kafka-connect-jdbc/source-mysql.properties

name=mysql-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10

connection.url=jdbc:mysql://localhost:3306/hibernateDB?user=root&password=root
mode=bulk
#mode=timestamp+incrementing
#timestamp.column.name=modified
#incrementing.column.name=id

topic.prefix=mysql


6. Copy mysql driver to kafka connect

cp mysql-connector-java-5.1.40.jar share/java/kafka-connect-jdbc/

7. Start JDBC Source Connector

nohup ./bin/connect-standalone etc/kafka/connect-standalone.properties  etc/kafka-connect-jdbc/source-mysql.properties  > jdbc.log 2>&1 &
 


Verify

mysql> show tables;
+-----------------------+
| Tables_in_hibernateDB |
+-----------------------+
| LOCATION              |
| PRODUCT               |
| TEST                  |
| TEST_LOCATION         |
+-----------------------+
 


mysql> desc LOCATION;
+---------------+--------------+------+-----+---------+----------------+
| Field         | Type         | Null | Key | Default | Extra          |
+---------------+--------------+------+-----+---------+----------------+
| ID            | bigint(20)   | NO   | PRI | NULL    | auto_increment |
| VERSION       | bigint(20)   | NO   |     | NULL    |                |
| LOCATION_NAME | varchar(255) | NO   | UNI | NULL    |                |
+---------------+--------------+------+-----+---------+----------------+




 

List Down topics
bin/kafka-topics --list --zookeeper localhost:2181

mysqlLocation
mysqlProduct
mysqlTest
mysqlTest_Location

View Topic Messages 
 
./bin/kafka-console-consumer  --zookeeper localhost:2181 --from-beginning --topic mysqlLocation

{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"ID"},{"type":"int64","optional":false,"field":"VERSION"},{"type":"string","optional":false,"field":"LOCATION_NAME"}],"optional":false,"name":"LOCATION"},"payload":{"ID":2,"VERSION":0,"LOCATION_NAME":"Denver"}}
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"ID"},{"type":"int64","optional":false,"field":"VERSION"},{"type":"string","optional":false,"field":"LOCATION_NAME"}],"optional":false,"name":"LOCATION"},"payload":{"ID":4,"VERSION":0,"LOCATION_NAME":"Boston"}}
 


No comments:

Post a Comment