10/18/2020

Kafka Consumer Re-balancing on Static Membership






The purpose of this post is to explain about Kafka consumer rebalancing and how static membership help to avoid consumer rebalancing. To get an idea about kafka partitions assignment for consumers, please refer this blog post.


Key Points


  • Single consumer rebalancing will effect on whole group, so it will stop message processing in whole consumer group. 

  • On consumer rebalancing, KafkaRebalanceListener#onPartitioned() method will be called just before it take away it's partitions. 
          Examples: 1. Consumer requesting leave group.
                            2. Already existed consumers partitions move to newly joined consumer.

  • Both consumer leave and join cause for consumer rebalance.

  • When you use static membership , coordinator will aware about that group_instance_id property since it will send to coordinator with request at join group process.

  • With static membership, consumer doesn't send Leave Group Request in a failure or restart. Because of that broker coordinator will wait till it reach session.timeout.ms . If the consumer came back within that time period, there won't be any rebalancing.











 

Kafka Partition Assignment for Consumers







The purpose of this post is to explain about kafka partition assignment.


Key Points



  •  If we have fix number of partitions and if we use messages keys in producer level, messages with same keys going to same partition.

  • Producer only send messages to partition leader.

  • It's not compulsory to have a key in a message, then broker will use round robin method to distribute messages among partitions 

  • Kafka guaranteed that a topic partition is assigned only to one consumer within the group.

  • All the consumers will send a join group request to coordinator broker, but first consumer will be the leader of the same consumer group.

  • Then consumer leader send synch group request with a body but rest of the consumers will send empty synch request to coordinator broker.

  • Consumer joining request body contains session.timeout.ms , max.poll.interval.ms .

  • Consumer leader is responsible for partition assignment within rest of the consumers in same group locally.

  • When consumers synching only consumer leader send request with a body while rest of the consumers in the same group will send empty request.

  • When consumers synching, all the consumers get a response with member assignment details.

  • Once coordinator broker responds for all the SynchGroup requests, consumer's configured listener will call onPartitionAssigned() method.

  • All the consumers periodically (heartbeat.interval.ms) send heart beat request to coordinator.

  • If coordinator triggers a rebalance, other consumers will only know of this by receiving the heartbeat response with REBALANCE_IN_PROGRESS exception encapsulated. Then only consumers know that they need to rejoin the group.


References:






5/15/2020

Secure HDFS With Kerberos And Access From Presto











































The purpose of this blog post is to setup Hadoop cluster which is secured by Kerberos authentication.
Also please note that, this setup only for learning purpose and not suitable for production setup.



Prerequisites:


  • Install docker and docker-compose
  • Install Java 8
  • My /etc/hosts file added entries. Need to change according to your machine IP address.
192.168.0.114    hadoop.docker.com
192.168.0.114    hive-metastore hadoop.docker.com  kdc.kerberos.com kdc EXAMPLE.COM

Steps:


1. Clone below 3 github projects


git clone https://github.com/dhanuka84/docker-hadoop.git

git clone https://github.com/dhanuka84/docker-hive.git

git clone https://github.com/dhanuka84/docker-hadoop-secure.git


2.  Build docker-hadoop docker image.


cd docker-hadoop/base


docker build -t dhanuka/hadoop:2.7.7 .


3. Build docker-hive docker image


cd docker-hive


docker build -t dhanuka/hive:2.3.2 .


4. Replace my local machine ip with your local machine ip.


cd docker-hadoop-secure


find ./ -type f -exec sed -i 's/192.168.0.114/your_local_ip/g' {} \;

5. Start Kerberos Key Distributed Service


cd docker-hadoop-secure

docker-compose -f docker-kdc.yml up -d

6. Create Kerberos principals and keytabs


Login to kdc docker container







Login as Kadmin









Execute below commands one by one to create principals


kadmin.local:  addprinc -randkey jhs/hadoop.docker.com@EXAMPLE.COM


kadmin.local:  addprinc -randkey yarn/hadoop.docker.com@EXAMPLE.COM


kadmin.local:  addprinc -randkey rm/hadoop.docker.com@EXAMPLE.COM


kadmin.local:  addprinc -randkey nm/hadoop.docker.com@EXAMPLE.COM


kadmin.local:  addprinc -randkey hive/hadoop.docker.com@EXAMPLE.COM

kadmin.local:  addprinc -randkey hive/hive-metastore@EXAMPLE.COM



Execute below commands one by one to create keytabs from principals


kadmin.local:  ktadd -k /opt/nn.service.keytab  nn/hadoop.docker.com


kadmin.local:  ktadd -k /opt/dn.service.keytab  dn/hadoop.docker.com


kadmin.local:  ktadd -k /opt/spnego.service.keytab  spnego/hadoop.docker.com


kadmin.local:  ktadd -k /opt/jhs.service.keytab  jhs/hadoop.docker.com


kadmin.local:  ktadd -k /opt/yarn.service.keytab  yarn/hadoop.docker.com


kadmin.local:  ktadd -k /opt/nm.service.keytab  nm/hadoop.docker.com


kadmin.local:  ktadd -k /opt/rm.service.keytab  rm/hadoop.docker.com


kadmin.local:  ktadd -k /opt/hive.keytab  hive/hive-metastore

kadmin.local: q

[root@kdc /]# exit


Keytabs can be found from kdc-opt directory. Now we need to copy them to keytabs directory































7. Start Hadoop Cluster



docker-compose -f docker-hdfs.yml up -d





























Full Log file

Please note hadoop configuration files in below location (specially core-site.xml & hdfs-site.xml )





















8. Start Hive Meta Store Service



docker-compose -f docker-hive.yml up -d

Please note , this will bootup, hive metastore, hive server and postgres db.

























We actually don't need hive server, so we can stop that docker container.







Login to Hive Metastore and use CLI tool to create external table






















As you can see above, it throws and error

Caused by: java.net.ConnectException: Call From hive-metastore/192.168.0.114 to hadoop.docker.com:9000 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused

Run below command:

kinit hive/hive-metastore@EXAMPLE.COM -k -t /etc/security/keytabs/hive.keytab

root@hive-metastore:/opt# cd hive/bin
root@hive-metastore:/opt/hive/bin# ./hive









Then hive CLI will be appeared.


9. Create hdfs folder structure and copy sample data file into that folder


Login to hdfs docker container





Authenticate

bash-4.1# kinit nn/hadoop.docker.com@EXAMPLE.COM -k -t /etc/security/keytabs/nn.service.keytab

Create hdfs folder

bash-4.1# hdfs dfs -mkdir /data

List down hdfs folder

bash-4.1# hdfs dfs -ls /

Found 2 items drwxr-xr-x - root root 0 2020-05-15 05:15 /data drwxrwx--- - root root 0 2020-05-15 05:02 /tmp

Create a test data file with below content

dhanu,colombo kithnu,colombo yuki,colombo

bash-4.1# vi test.dat

Copy test.dat file to hdfs folder /data





bash-4.1# hdfs dfs -ls /data








10. Create External Hive table.


execute below command in hive CLI

CREATE EXTERNAL TABLE data_t5  (name string, city string) ROW FORMAT DELIMITED FIELDS TERMINATED BY "," STORED AS TEXTFILE LOCATION "/data";




Use postgres client application to connect to postgres db































As you can see table metadata in postgress metastore database.


11. Setup Apache Presto 



Follow below documentation to setup Presto-Server in your local machine

https://docs.starburstdata.com/latest/installation/deployment.html

Follow below documentation to setup Presto-CLI in your local machine.

https://docs.starburstdata.com/latest/installation/cli.html

I have used single node Presto-Server , which means single coordinator and single worker nodes.

My Presto-Server folder structure:


My Presto-CLI folder structure:





Configure Presto-Server: etc/config.properties

dhanuka@dhanuka:~/research/hdfs/prestor/presto-server-332$ vim etc/config.properties

node.id=presto-master
node.environment=test
coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
query.max-memory=1024MB
query.max-memory-per-node=2048MB
query.max-total-memory-per-node=2048MB
discovery-server.enabled=true
discovery.uri=http://localhost:8080
task.max-worker-threads=8

Configure Hive catalog and jvm:

Replace presto/etc/ with docker-hadoop-secure/presto/etc/ files










dhanuka@dhanuka:~/research/hdfs/prestor/presto-server-332$ vim etc/catalog/hive.properties

connector.name=hive-hadoop2
hive.metastore.uri=thrift://hive-metastore:9083
hive.metastore.authentication.type=KERBEROS
hive.metastore.service.principal=hive/hive-metastore@EXAMPLE.COM
hive.metastore.client.principal=hive/hive-metastore@EXAMPLE.COM
hive.metastore.client.keytab=/home/dhanuka/research/hdfs/prestor/presto-server-332/keytabs/hive.keytab
hive.hdfs.authentication.type=KERBEROS
hive.hdfs.presto.principal=nn/hadoop.docker.com@EXAMPLE.COM
hive.hdfs.presto.keytab=/home/dhanuka/research/hdfs/prestor/presto-server-332/keytabs/nn.service.keytab
hive.hdfs.impersonation.enabled=false
hive.config.resources=/home/dhanuka/research/hdfs/docker-hadoop-secure/hive/conf/hdfs-site.xml,/home/dhanuka/research/hdfs/docker-hadoop-secure/hive/conf/core-site.xml

Replace /home/dhanuka/research/hdfs/ with your local machine location

dhanuka@dhanuka:~/research/hdfs/prestor/presto-server-332$ vim etc/jvm.config

-server-Xmx3G-XX:+UseG1GC-XX:+UseGCOverheadLimit-XX:+ExplicitGCInvokesConcurrent-XX:+HeapDumpOnOutOfMemoryError-XX:+ExitOnOutOfMemoryError-XX:ReservedCodeCacheSize=150M-DHADOOP_USER_NAME=hive-Duser.timezone=UTC-Djdk.attach.allowAttachSelf=true-Djdk.nio.maxCachedBufferSize=2000000-Dpresto-temporarily-allow-java8=true-Djava.security.krb5.conf=/home/dhanuka/research/hdfs/docker-hadoop-secure/config_files/krb5.conf-Dsun.security.krb5.debug=true-Dlog.enable-console=true


12. Start Presto-Server and execute SQL commands in Presto-CLI


Launch Presto-Server

./bin/launcher run











Launch Presto-CLI

 ./presto-cli --catalog hive --schema default


Run Select query

select * from data_t5;



































References:


https://github.com/Knappek/docker-hadoop-secure