5/28/2018

Complete Solution for SQL Based Real Time Streaming Analytics & Machine Learning

 


This hypothetical and high level architecture diagram will explain how we can use OSS technologies more effectively. We are going to discuss about this whole solution under four main subject areas.

  1. Distributed Streaming Message Broker (Data Pipeline)
  2. Streaming ETL & Streaming Aggregation
  3. Time Series Data Storing & ML Processing
  4. Dashboard & Notification 

Distributed Streaming Message Broker

We use Kafka as our data pipeline, you can get more details about Kafka from here [1] .
Kafka is proven technology which perform as distributed streaming log. Using it's partitioning technique we can scale up easily. It's perfectly ideal for event driven architecture because it's streaming nature. Kafka make it much more likely that disk access is often sequential and it utilized OS page cache efficiently [2].

Streaming ETL & Streaming Aggregation

Flink will be the stream processing platform that we use for aggregation, ETL and CEP. Flink by-design support stream processing and it's widely use streaming technology at the moment. Flink is based on the DataFlow model which means, Flink is processing the elements as and when they come rather than processing them in micro-batches (which is done by Spark streaming).

Uber AthenaX is a SQL based streaming analytics framework [3]. It's combination of YARN , Calcite & Flink. AthenaX give APIs to monitor, access & administrate life cyclone of Flink Jobs. Also because of Calcite we can write SQL based stream processing applications and run them inside AthenaX easily.

Time Series Data Storing & ML Processing

We use Cassandra as time series data storage. Cassandra architecture is ideal for this purpose because of sequential writing to disk, which will help for fast reading large set of data.

So the time series prepared data we got from streaming analytics platform, will be stored in Cassandra for machine learning.

We will train several ML models using prepared data, then trained models/knowledge base will be stored (as binary) in Cassandra.This way we can dynamically select trained models when we analyze real time data.

Spark slave/worker and Cassandra will reside in same host, and both will be connected by Spark-Cassandra-Connector. For high availability there will be active and stand by Spark master while Zookeeper keep the state of each master.

The main advantage of this approach is  guarantee of data locality between Cassandra node and Spark slave, which will cause for high performance data fetching.

Dashboard & Notification

Finally you need to store analyzed data for later use (Dashboard & trend analysis). Say for example, If we use this solution for predictive analysis, you can save predicted data in ElasticSearch. By using ELK stack, people can create Dashboards easily using analysis data with Kibana.

Also there are lot of other features in ElasticSearch & Kibana such as ES alerts, Trend analysis, Searching and etc...


References:

[1] https://kafka.apache.org/
[2] https://stackoverflow.com/questions/45751641/how-does-kafka-guarantee-sequential-disk-access
[3] https://eng.uber.com/tag/athenax/
[4] https://www.infoq.com/presentations/uber-ml-architecture-models?utm_source=infoqemail&utm_medium=ai-ml-data-eng&utm_campaign=newsletter&utm_content=05152018








5/25/2018

MySql Change Data Capturing With Kafka Connect


Software & tools you need to setup

      1. Install MySql 5.7 in your local machine.
    2. JDK 1.8
    3. Maven
    4. Confluent platform 3.2.2
    5. Download below JARs from maven repository  (https://mvnrepository.com/artifact)     

debezium-core-0.5.2.jar,
debezium-connector-mysql-0.5.2.jar,
mysql-binlog-connector-java-0.9.2.jar ,
mysql-connector-java-5.1.40.jar,
protobuf-java-2.6.1.jar, wkb-1.0.2.jar

Steps: 

1. Enable bin-log in mysql and create database and table in mysql

  • Go to /etc/mysql/mysql.conf.d folder
Open and add below lines to mysqld.cnf file

#bin logs
server-id         = 223344
log_bin           = mysql-bin
binlog_format     = row
binlog_row_image  = full
expire_logs_days  = 1
log-bin-index     = bin-log.index

Restart mysql

$ /etc/init.d/mysql stop
$ /etc/init.d/mysql start

Create mysql db and table

$ mysql -u root -p root

mysql> create database eventsource;
mysql>  DROP TABLE IF EXISTS `CMDB_TAG`;
CREATE TABLE `CMDB_TAG` (
 `VERSION` bigint(20) DEFAULT NULL,
 `TAG_KEY` varchar(255),
 `TAG_VALUE` varchar(255),
 PRIMARY KEY (`TAG_KEY`, `TAG_VALUE`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

2. Create mysql user with proper privileges


$ mysql -u root -p root

mysql>  GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';

3. Start Zookeeper


$ cd  ~/software/confluent-3.2.2


confluent-3.2.2$ nohup ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties &

4. Start Kafka


confluent-3.2.2$ nohup ./bin/kafka-server-start ./etc/kafka/server.properties > kafka.log 2>&1 &

Create kafka topic named events

confluent-3.2.2$ bin/kafka-topics  --create --zookeeper localhost:2181 --replication-factor 3 --partitions 6 --topic  events --config min.insync.replicas=2 --config unclean.leader.election.enable=false

5.  Setup mysql kafka connector


Create directory called kafka-connect-cdc within confluent

confluent-3.2.2$ cd share/java
$ mkdir kafka-connect-cdc

Rename debezium-connector-mysql-0.5.2.jar as kafka-connect-cdc.jar

$ mv debezium-connector-mysql-0.5.2.jar as kafka-connect-cdc.jar

Copy all the downloaded jar into kafka-connect-cdc directory

confluent-3.2.2/share/java/kafka-connect-cdc$ ls
debezium-core-0.5.2.jar  kafka-connect-cdc.jar mysql-binlog-connector-java-0.9.2.jar  mysql-connector-java-5.1.40.jar protobuf-java-2.6.1.jar wkb-1.0.2.jar

Configure Connector

Go to etc folder

$ cd etc

Create directory called kafka-connect-cdc

$ mkdir kafka-connect-cdc

Create property file called kafka-connect-cdc.properties
$ touch kafka-connect-cdc.properties

Copy below content to kafka-connect-cdc.properties file

name=mysql-source
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=localhost
database.port=3306
database.user=debezium
database.password=dbz
server.id=223344
database.server.name=eventsource
database.history.kafka.bootstrap.servers=localhost:9092
database.whitelist=eventsource
database.history.kafka.topic=events
tasks.max=10

Start mysql cdc connector

confluent-3.2.2$  nohup ./bin/connect-standalone etc/kafka/connect-standalone.properties  etc/kafka-connect-cdc/kafka-connect-cdc.properties > jdbc.log 2>&1 &

6. List Kafka topics


confluent-3.2.2$ bin/kafka-topics --zookeeper localhost:2181 --list

__consumer_offsets
events
eventsource
eventsource.eventsource.CMDB_TAG 
  • You can see that connector has created, two topics which is highlighted in yellow color.

7. Consume JSON messages from Kafka


confluent-3.2.2$ bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic eventsource.eventsource.CMDB_TAG

8. Testing the Connector


Do some CRUD operation on table that we have created

mysql> use eventsource;
mysql> insert into CMDB_TAG (VERSION,TAG_KEY,TAG_VALUE) values(0,'TEST4','TEST1');


update CMDB_TAG set VERSION = 1 where TAG_KEY = 'TEST4' and TAG_VALUE = 'TEST1';


delete from CMDB_TAG where TAG_KEY = 'TEST4' and TAG_VALUE = 'TEST1';

From Kafka consumer console you can see following JSON messages.

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":true,"field":"VERSION"},{"type":"string","optional":false,"field":"TAG_KEY"},{"type":"string","optional":false,"field":"TAG_VALUE"}],"optional":true,"name":"eventsource.eventsource.CMDB_TAG.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":true,"field":"VERSION"},{"type":"string","optional":false,"field":"TAG_KEY"},{"type":"string","optional":false,"field":"TAG_VALUE"}],"optional":true,"name":"eventsource.eventsource.CMDB_TAG.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"eventsource.eventsource.CMDB_TAG.Envelope","version":1},"payload":{"before":null,"after":{"VERSION":0,"TAG_KEY":"TEST4","TAG_VALUE":"TEST1"},"source":{"name":"eventsource","server_id":223344,"ts_sec":1527283334,"gtid":null,"file":"mysql-bin.000003","pos":1984,"row":0,"snapshot":null,"thread":5,"db":"eventsource","table":"CMDB_TAG"},"op":"c","ts_ms":1527283334083}}
 
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":true,"field":"VERSION"},{"type":"string","optional":false,"field":"TAG_KEY"},{"type":"string","optional":false,"field":"TAG_VALUE"}],"optional":true,"name":"eventsource.eventsource.CMDB_TAG.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":true,"field":"VERSION"},{"type":"string","optional":false,"field":"TAG_KEY"},{"type":"string","optional":false,"field":"TAG_VALUE"}],"optional":true,"name":"eventsource.eventsource.CMDB_TAG.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"eventsource.eventsource.CMDB_TAG.Envelope","version":1},"payload":{"before":{"VERSION":0,"TAG_KEY":"TEST4","TAG_VALUE":"TEST1"},"after":{"VERSION":1,"TAG_KEY":"TEST4","TAG_VALUE":"TEST1"},"source":{"name":"eventsource","server_id":223344,"ts_sec":1527283762,"gtid":null,"file":"mysql-bin.000003","pos":2279,"row":0,"snapshot":null,"thread":5,"db":"eventsource","table":"CMDB_TAG"},"op":"u","ts_ms":1527283762079}}
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":true,"field":"VERSION"},{"type":"string","optional":false,"field":"TAG_KEY"},{"type":"string","optional":false,"field":"TAG_VALUE"}],"optional":true,"name":"eventsource.eventsource.CMDB_TAG.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":true,"field":"VERSION"},{"type":"string","optional":false,"field":"TAG_KEY"},{"type":"string","optional":false,"field":"TAG_VALUE"}],"optional":true,"name":"eventsource.eventsource.CMDB_TAG.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"eventsource.eventsource.CMDB_TAG.Envelope","version":1},"payload":{"before":{"VERSION":1,"TAG_KEY":"TEST4","TAG_VALUE":"TEST1"},"after":null,"source":{"name":"eventsource","server_id":223344,"ts_sec":1527283970,"gtid":null,"file":"mysql-bin.000003","pos":2596,"row":0,"snapshot":null,"thread":5,"db":"eventsource","table":"CMDB_TAG"},"op":"d","ts_ms":1527283970341}}
{"schema":null,"payload":null}
  • I have highlighted the db operations in pink color.
  • Also once we delete a record we have received last JSON, which is highlighted in green.


References:




5/20/2018

Event Sourcing for Micro Services




This is a hypothetical & high level architecture for Event Sourcing.

Key Architectural Considerations

1. Beyond Eventual Consistency.
2. Asynchronous.
3. Command Query Responsibility Segregation (CQRS).
4. Fault tolerant & Fail safe
5. Scalability.
6. Zero event lost & Exactly once.

Brief Explanation.

1. Micro-services application will publish events to Kafka.
2. Kafka consumer (sink connector) will consume and insert into Mysql Event Source table within a transactional context.
3. Mysql Change Data Capture (source connector) which runs within Kafka Connect will detect changes.
4. Kafka source connector will transform events and publish to Kafka.
5. Kafka connect will consume events from Kafka.
6. Elasticsearch sink connector will insert events into ES and update relevant MySql record activeness within transactional context.
7. Micro-Service query data from API.
8 & 9. API return latest data unless it's an DELETE event.

Events Cleaning

1. There are schedulers to clean/delete both MySQL and ES active index.
2. All the events will be rest in historical index.
3. MySQL events will be partitioned based on it's activeness.

5/07/2018

Setup Spark Standalone Mode HA Cluster With Shell Script


From previous post [1], we have showed, how to setup standalone mode spark cluster and now we are going to improve the architecture by adding stand by master to achieve high availability (fault tolerance) of master role.

[1] http://dhanuka84.blogspot.com/2018/05/setup-spark-standalone-mode-cluster.html


Setup Zookeeper

1. I am using Zookeeper (ZK) version 3.4.12 . Download & extract ZK from site [1].

[1] http://www-eu.apache.org/dist/zookeeper/stable/

2. Go to ZK folder and create configuration file

zookeeper-3.4.12]$  cp conf/zoo_sample.cfg conf/zoo.cfg

3. Edit zoo.cfg file

dataDir=/home/dhanuka/zookeeper-3.4.12/data

4. Start ZK

zookeeper-3.4.12]$ ./bin/zkServer.sh start 


Spark Standalone Mode Zookeeper configuration

1. $SPARK_HOME/conf/spark-env.sh HA configuration

SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER  -Dspark.deploy.zookeeper.url=10.163.134.152:2181  -Dspark.deploy.zookeeper.dir=/spark"

2. Same configuration will be applied in stand by master node (153).

3. Slaves will be pointing to both masters and primary master will be boot-up first.

4. Full script to boot-up both masters and slaves

Script

cp $SPARK_HOME/conf/spark-env.sh.template $SPARK_HOME/conf/spark-env.sh
cp $SPARK_HOME/conf/spark-env.sh.template $SPARK_HOME/conf/spark-second-env.sh
sed -i  '$ a\SPARK_MASTER_HOST=10.163.134.151'   $SPARK_HOME/conf/spark-env.sh
sed -i  '$ a\SPARK_MASTER_WEBUI_PORT=8085'   $SPARK_HOME/conf/spark-env.sh
sed -i  '$ a\SPARK_MASTER_WEBUI_PORT=8085'   $SPARK_HOME/conf/spark-second-env.sh
sed -i  '$ a\SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER  -Dspark.deploy.zookeeper.url=10.163.134.152:2181  -Dspark.deploy.zookeeper.dir=/spark"' $SPARK_HOME/conf/spark-env.sh
sed -i  '$ a\SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER  -Dspark.deploy.zookeeper.url=10.163.134.152:2181  -Dspark.deploy.zookeeper.dir=/spark"' $SPARK_HOME/conf/spark-second-env.sh
cp $SPARK_HOME/conf/slaves.template $SPARK_HOME/conf/slaves
sed -i  '$ a\10.163.134.151\n10.163.134.152\n10.163.134.153'   $SPARK_HOME/conf/slaves



echo " Stop anything that is running"
$SPARK_HOME/sbin/stop-all.sh

sleep 2

echo " Start Master"
$SPARK_HOME/sbin/start-master.sh

# Pause
sleep 10


echo "start stand by Master"

scp $SPARK_HOME/conf/spark-second-env.sh dhanuka@10.163.134.153:$SPARK_HOME/conf/
ssh dhanuka@10.163.134.153 'cp $SPARK_HOME/conf/spark-second-env.sh $SPARK_HOME/conf/spark-env.sh'
ssh dhanuka@10.163.134.153 'sed -i  "$ a\SPARK_MASTER_HOST=10.163.134.153"   $SPARK_HOME/conf/spark-env.sh'

scp ha.conf dhanuka@10.163.134.153:/home/dhanuka/
ssh dhanuka@10.163.134.153 '$SPARK_HOME/sbin/start-master.sh --host 10.163.134.153'


sleep 5

echo " Start Workers"
#SPARK_SSH_FOREGROUND=true  $SPARK_HOME/sbin/start-slaves.sh
SPARK_SSH_FOREGROUND=true  $SPARK_HOME/sbin/start-slaves.sh  spark://10.163.134.151:7077,10.163.134.153:7077



Run Bootstrap Script & Validate

1. $ sh setup.sh 

 Stop anything that is running
10.163.134.151: no org.apache.spark.deploy.worker.Worker to stop
10.163.134.152: no org.apache.spark.deploy.worker.Worker to stop
10.163.134.153: no org.apache.spark.deploy.worker.Worker to stop
no org.apache.spark.deploy.master.Master to stop
 Start Master
starting org.apache.spark.deploy.master.Master, logging to /home/dhanuka/spark-2.2.1-bin-hadoop2.7/logs/spark-dhanuka-org.apache.spark.deploy.master.Master-1-bo3uxgmpxxxxnn.out
start stand by Master
spark-second-env.sh                                                                                                             100% 3943    12.6MB/s   00:00   
ha.conf                                                                                                                         100%  117   496.8KB/s   00:00   
starting org.apache.spark.deploy.master.Master, logging to /home/dhanuka/spark-2.2.1-bin-hadoop2.7/logs/spark-dhanuka-org.apache.spark.deploy.master.Master-1-bo3uxgmpxxxxnn.out
 Start Workers
10.163.134.151: starting org.apache.spark.deploy.worker.Worker, logging to /home/dhanuka/spark-2.2.1-bin-hadoop2.7/logs/spark-dhanuka-org.apache.spark.deploy.worker.Worker-1-bo3uxgmpxxxxnn.out
10.163.134.152: starting org.apache.spark.deploy.worker.Worker, logging to /home/dhanuka/spark-2.2.1-bin-hadoop2.7/logs/spark-dhanuka-org.apache.spark.deploy.worker.Worker-1-bo3uxgmpxxxxnn.out
10.163.134.153: starting org.apache.spark.deploy.worker.Worker, logging to /home/dhanuka/spark-2.2.1-bin-hadoop2.7/logs/spark-dhanuka-org.apache.spark.deploy.worker.Worker-1-bo3uxgmpxxxxnn.out


2. Validate Master node and worker node in 151

$ ps -ef | grep spark
 
dhanuka   2178     1  8 23:20 pts/0    00:00:03 /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.151-1.b12.el7_4.x86_64/jre/bin/java -cp /home/dhanuka/spark-2.2.1-bin-hadoop2.7/conf/:/home/dhanuka/spark-2.2.1-bin-hadoop2.7/jars/* -Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=10.163.134.152:2181 -Dspark.deploy.zookeeper.dir=/spark -Xmx1g org.apache.spark.deploy.master.Master --host 10.163.134.151 --port 7077 --webui-port 8085
dhanuka   2289     1 22 23:20 ?        00:00:03 /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.151-1.b12.el7_4.x86_64/jre/bin/java -cp /home/dhanuka/spark-2.2.1-bin-hadoop2.7/conf/:/home/dhanuka/spark-2.2.1-bin-hadoop2.7/jars/* -Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=10.163.134.152:2181 -Dspark.deploy.zookeeper.dir=/spark -Xmx1g org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://10.163.134.151:7077
dhanuka   2347 30581  0 23:21 pts/0    00:00:00 grep --color=auto spark


 3. Validate Standby Master and worker which reside in 153

 $ ps -ef | grep spark

dhanuka   4738     1  5 23:19 ?        00:00:02 /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.151-1.b12.el7_4.x86_64/jre/bin/java -cp /home/dhanuka/spark-2.2.1-bin-hadoop2.7/conf/:/home/dhanuka/spark-2.2.1-bin-hadoop2.7/jars/* -Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=10.163.134.152:2181 -Dspark.deploy.zookeeper.dir=/spark -Xmx1g org.apache.spark.deploy.master.Master --host 10.163.134.153 --port 7077 --webui-port 8085 --host 10.163.134.153
dhanuka   4829     1  6 23:20 ?        00:00:02 /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.151-1.b12.el7_4.x86_64/jre/bin/java -cp /home/dhanuka/spark-2.2.1-bin-hadoop2.7/conf/:/home/dhanuka/spark-2.2.1-bin-hadoop2.7/jars/* -Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=10.163.134.152:2181 -Dspark.deploy.zookeeper.dir=/spark -Xmx1g org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://10.163.134.151:7077
dhanuka   4881  2354  0 23:20 pts/0    00:00:00 grep --color=auto spark



4.  Spark Admin Panels

Lead Master (Primary)


Stand-by Master (Secondary)





 Verify Spark Master fault tolerance support.

1. Let's kill the primary master.

$ kill -9 2178

2. Within 1-2 minutes time, you can see that stand by master lead the workers.





References:

[1]  http://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts

[2]  https://dzone.com/articles/spark-and-zookeeper-fault

[3] https://mapr.com/support/s/article/How-to-enable-High-Availability-on-Spark-with-Zookeeper?language=en_US

[4] https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/exercises/spark-exercise-standalone-master-ha.html











5/04/2018

Setup Spark Standalone Mode Cluster With Shell Script




I am using spark-2.2.1-bin-hadoop2.7.tgz file from spark download site.
Also there are three virtual machines (151,152,153) and both Master and one of the slave will be running in 151.

Prerequisites : Installed Java 1.8 in all virtual machines.


Configure SSH Key-Based Authentication on a Linux Server

1. Run below command in all the three nodes to generate public & private keys.
   For each question just press Enter button (keep password blank).

$ ssh-keygen -t rsa

2. Copy Master public key in slave nodes. Enter user password for each node.

$ ssh-copy-id dhanuka@10.163.134.151
$ ssh-copy-id dhanuka@10.163.134.152
$ ssh-copy-id dhanuka@10.163.134.153

Copy spark for slave nodes

$ scp -r $SPARK_HOME dhanuka@10.163.134.152:~
$ scp -r $SPARK_HOME dhanuka@10.163.134.153:~


Spark environment variable setup

1. Change ~/.bashrc file as below and run source command in each node


export SPARK_HOME=/home/dhanuka/spark-2.2.1-bin-hadoop2.7

export PATH=$PATH:$SPARK_HOME/bin

2. $  source ~/.bash_profile

Bootstrap master and other slave nodes using setup.sh shell script

 $ ./setup.sh

Stop anything that is running
localhost: stopping org.apache.spark.deploy.worker.Worker
10.163.134.151: no org.apache.spark.deploy.worker.Worker to stop
10.163.134.152: stopping org.apache.spark.deploy.worker.Worker
10.163.134.153: no org.apache.spark.deploy.worker.Worker to stop
stopping org.apache.spark.deploy.master.Master
 Start Master
starting org.apache.spark.deploy.master.Master, logging to /home/dhanuka/spark-2.2.1-bin-hadoop2.7/logs/spark-dhanuka-org.apache.spark.deploy.master.Master-1-bo3uxgmpxxxxnn.out
 Start Workers
localhost: starting org.apache.spark.deploy.worker.Worker, logging to /home/dhanuka/spark-2.2.1-bin-hadoop2.7/logs/spark-dhanuka-org.apache.spark.deploy.worker.Worker-1-bo3uxgmpxxxxnn.out
10.163.134.151: org.apache.spark.deploy.worker.Worker running as process 27165.  Stop it first.
10.163.134.152: starting org.apache.spark.deploy.worker.Worker, logging to /home/dhanuka/spark-2.2.1-bin-hadoop2.7/logs/spark-dhanuka-org.apache.spark.deploy.worker.Worker-1-bo3uxgmpxxxxnn.out
10.163.134.153: starting org.apache.spark.deploy.worker.Worker, logging to /home/dhanuka/spark-2.2.1-bin-hadoop2.7/logs/spark-dhanuka-org.apache.spark.deploy.worker.Worker-1-bo3uxgmpxxxxnn.out

 


Script

#!/bin/bash

source ~/.bash_profile
export SPARK_SSH_FOREGROUND="yes"

cp $SPARK_HOME/conf/spark-env.sh.template $SPARK_HOME/conf/spark-env.sh
sed -i  '$ a\SPARK_MASTER_HOST=10.163.134.151'   $SPARK_HOME/conf/spark-env.sh
sed -i  '$ a\SPARK_MASTER_WEBUI_PORT=8085'   $SPARK_HOME/conf/spark-env.sh


cp $SPARK_HOME/conf/slaves.template $SPARK_HOME/conf/slaves
sed -i  '$ a\10.163.134.151\n10.163.134.152\n10.163.134.153'   $SPARK_HOME/conf/slaves

echo " Stop anything that is running"
$SPARK_HOME/sbin/stop-all.sh

sleep 2

echo " Start Master"
$SPARK_HOME/sbin/start-master.sh


# Pause
sleep 20

echo " Start Workers"
SPARK_SSH_FOREGROUND=true  $SPARK_HOME/sbin/start-slaves.sh
 



Master Admin Panel




References:

[1] https://www.digitalocean.com/community/tutorials/how-to-set-up-ssh-keys--2




5/01/2018

Create Scala Project From Maven



In here I am using Scala version 2.11.8 and Java version 1.8.

Environment setup in .profile
#scala,java,maven setup
export SCALA_HOME=/home/dhanuka/software/scala-2.11.8

export JAVA_HOME=/home/dhanuka/software/jdk1.8.0_131
export MAVEN_HOME=/home/dhanuka/software/apache-maven-3.3.9
PATH=$PATH:$JAVA_HOME/bin:$MAVEN_HOME/bin:$SCALA_HOME/bin


$ source ~/.profile


Create Maven Project

1. $ mvn archetype:generate 

2. select net.alchim31.maven filter and number for that is 925

$ 925

Then Choose net.alchim31.maven:scala-archetype-simple version:

Choose a number: 3: 3

3. Then type groupId , ArtifactId and etc...

Define value for property 'groupId': org.maven.scala

Define value for property 'artifactId': myfirst-scala

Define value for property 'version' 1.0-SNAPSHOT: :
Define value for property 'package' org.scala.maven: :

Y: : Y

$ ls

myfirst-scala

$ mvn package

[ERROR] /home/uranadh/research/scala/first/myfirst-scala/src/test/scala/samples/specs.scala:18: error: not found: type JUnitRunner
[ERROR] @RunWith(classOf[JUnitRunner])
[ERROR]                  ^
[ERROR] one error found
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
 


Troubleshooting pom.xml

1. Change compiler version to 1.8

<maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target>


2. Change Scala version to 2.11.8

<scala.version>2.11.8</scala.version>

3. Add specs2-junit_ dependency.

<dependency>
      <groupId>org.specs2</groupId>
      <artifactId>specs2-junit_${scala.compat.version}</artifactId>
      <version>2.4.16</version>
      <scope>test</scope>
 </dependency>


4. Build again

$ mvn package

5. Jar location

myfirst-scala/target/myfirst-scala-1.0-SNAPSHOT.jar


Full pom.xml file

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>org.maven.scala</groupId>
  <artifactId>myfirst-scala</artifactId>
  <version>1.0-SNAPSHOT</version>
  <name>${project.artifactId}</name>
  <description>My wonderfull scala app</description>
  <inceptionYear>2015</inceptionYear>
  <licenses>
    <license>
      <name>My License</name>
      <url>http://....</url>
      <distribution>repo</distribution>
    </license>
  </licenses>

  <properties>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.11.8</scala.version>
    <scala.compat.version>2.11</scala.compat.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

    <!-- Test -->
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.specs2</groupId>
      <artifactId>specs2-core_${scala.compat.version}</artifactId>
      <version>2.4.16</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.scalatest</groupId>
      <artifactId>scalatest_${scala.compat.version}</artifactId>
      <version>2.2.4</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.specs2</groupId>
      <artifactId>specs2-junit_${scala.compat.version}</artifactId>
      <version>2.4.16</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
      <plugin>
        <!-- see http://davidb.github.com/scala-maven-plugin -->
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.2.0</version>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
            <configuration>
              <args>
                <!--arg>-make:transitive</arg-->
                <arg>-dependencyfile</arg>
                <arg>${project.build.directory}/.scala_dependencies</arg>
              </args>
            </configuration>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>2.18.1</version>
        <configuration>
          <useFile>false</useFile>
          <disableXmlReport>true</disableXmlReport>
          <!-- If you have classpath issue like NoDefClassError,... -->
          <!-- useManifestOnlyJar>false</useManifestOnlyJar -->
          <includes>
            <include>**/*Test.*</include>
            <include>**/*Suite.*</include>
          </includes>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>




References:

https://docs.scala-lang.org/tutorials/scala-with-maven.html