4/28/2018

Spark Cassandra Integration with spark-cassandra-connector

In here, I am going to show how to integrate local single node Cassandra db with standalone spark using spark-cassandra-connector.

Setup Cassandra, Spark, Scala & ScalaBuildTool

1. Download Cassandra & Spark. I am using Cassandra version 3.11.2  and Spark version 2.2.1 .

http://cassandra.apache.org/download/
https://spark.apache.org/releases/spark-release-2-2-1.html
https://www.scala-lang.org/download/2.11.8.html
https://www.scala-sbt.org/download.html

2. Environment setup in .profile

#cassandra setup
export CASSANDRA_HOME=/home/dhanuka/software/apache-cassandra-3.11.2

#spark, sbt and scala setup
export SPARK_HOME=/home/dhanuka/software/spark/spark-2.2.1-bin-hadoop2.7
export SBT_HOME=/home/dhanuka/software/spark/sbt-launcher-packaging-0.13.13
export SCALA_HOME=/home/dhanuka/software/scala-2.11.8

PATH=$PATH:$JAVA_HOME/bin:$MAVEN_HOME/bin:SPARK_HOME/bin:$SBT_HOME/bin:$SCALA_HOME/bin:CASSANDRA_HOME/bin

3. $ source ~/.profile 

Create Cassandra Keyspace and Table

 1. Start cassandra with following command

$ cassandra -f

2. Start CQL shell

$ cqlsh

3. Create keyspace and a table

cqlsh> CREATE KEYSPACE people WITH replication = {'class': 'SimpleStrategy', 'replication_factor':1};

cqlsh> use people;

cqlsh:people> CREATE TABLE users(
          ... id varchar ,
          ... first_name varchar,
          ... last_name varchar,
          ... city varchar,
          ... emails varchar,
          ... PRIMARY KEY (id));

 

cqlsh:people>  Insert into users (id,first_name,last_name,city,emails) values('1','dhanuka','ranasinghe','colombo','dhanuka.priyanath@gmail.com');

 cqlsh:people> select * from users;

 id      | city    | emails                      | first_name | last_name
---------+---------+-----------------------------+------------+------------
 1 | colombo | dhanuka.priyanath@gmail.com |    dhanuka | ranasinghe




Build spark-cassandra-connector.

1. clone from git hub repository.

git clone https://github.com/datastax/spark-cassandra-connector.git 

cd spark-cassandra-connector

2. Build the project with scala 2.11 and cassandra 3.11.2

spark-cassandra-connector$ sbt -Dscala-2.11=true -Dtest.cassandra.version=3.11.2 assembly

You can find the jar location below.

$  spark-cassandra-connector/spark-cassandra-connector/target/full/scala-2.11/spark-cassandra-connector-assembly-2.0.7-82-g0369a7b.jar

mv  spark-cassandra-connector-assembly-2.0.7-82-g0369a7b.jar   spark-cassandra-connector-assembly-2.0.7.jar


 Connect Spark with Cassandra through Spark-Shell

1. Copy cassandra-connector-assembly-2.0.7.jar to spark jars location. Copy to below location

cp cassandra-connector-assembly-2.0.7.jar  $SPARK_HOME/jars

2.  Start spark-shell

$ spark-shell --jars $SPARK_HOME/jars/spark-cassandra-connector-assembly-2.0.7.jar

3. Stop current spark context

scala> sc.stop

4. Program to read Cassandra from spark

scala> import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf
import com.datastax.spark.connector._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf



scala> val conf = new SparkConf(true).set("spark.cassandra.connection.host", "localhost")
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@2c2a7d53


scala> val sc = new SparkContext(conf)
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@15914bb5


scala> val test_spark_rdd = sc.cassandraTable("people", "users")
test_spark_rdd: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:19


scala> test_spark_rdd.first
res1: com.datastax.spark.connector.CassandraRow = CassandraRow{id: 1, city: colombo, emails: dhanuka.priyanath@gmail.com, first_name: dhanuka, last_name: ranasinghe}


References:

[1] https://www.datastax.com/dev/blog/kindling-an-introduction-to-spark-with-cassandra-part-1

[2] https://www.youtube.com/watch?v=jpEABn80OCU




4/15/2018

Setup VNode Cassandra cluster in local machine

Here I am using Ubuntu Linux machine and also tool called Cassandra Cluster Manager (ccm).

Steps

1. :~/opensource$ mkdir cassandra

2. :~/opensource$ cd cassandra/

3. :~/opensource/cassandra$ git clone https://github.com/riptano/ccm.git

4. :~/opensource/cassandra$ cd ccm/

5. :~/opensource/cassandra/ccm$ ls

ccm  ccmlib  license.txt  MANIFEST.in  misc  NETWORK_ALIASES.md  README.md  setup.py  ssl  tests

6. :~/opensource/cassandra/ccm$ sudo ./setup.py install

7. :~/opensource/cassandra/ccm$ ccm create -v 3.11.2 -n 3 my_cluster --vnodes

10:22:00,960 ccm INFO Downloading http://archive.apache.org/dist/cassandra/3.11.2/apache-cassandra-3.11.2-bin.tar.gz to /tmp/ccm-Wo1DBu.tar.gz (36.656MB)
  38436262  [100.00%]10:24:09,353 ccm INFO Extracting /tmp/ccm-Wo1DBu.tar.gz as version 3.11.2 ...
Current cluster is now: my_cluster


cluster name: my_cluster
cassandra version: 3.11.2
number of nodes: 3

8.  :~/opensource/cassandra/ccm$ ccm list

*my_cluster

9. :~/opensource/cassandra/ccm$ ccm status
Cluster: 'my_cluster'
---------------------
node1: DOWN (Not initialized)
node3: DOWN (Not initialized)
node2: DOWN (Not initialized)


10. :~/opensource/cassandra/ccm$ ccm start
      :~/opensource/cassandra/ccm$ ccm status
Cluster: 'my_cluster'
---------------------
node1: UP
node3: UP
node2: UP



 11.
:~/opensource/cassandra/ccm$ ccm node1 status

Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Tokens       Owns (effective)  Host ID                               Rack
UN  127.0.0.1  88.93 KiB  256          64.1%             5e9d0ad1-d7d0-4023-b71c-2f57375c2573  rack1
UN  127.0.0.2  69.9 KiB   256          67.3%             e8ea9b63-7846-4494-bd0d-2c8dfcb9274b  rack1
UN  127.0.0.3  103.62 KiB  256          68.7%             301887a5-b386-47c6-8a59-3f71ea09b9fc  rack1



12. :~/opensource/cassandra/ccm$ ccm node1 ring


Datacenter: datacenter1
==========
Address    Rack        Status State   Load            Owns                Token                                      
                                                                          9184122387525172037                        
127.0.0.3  rack1       Up     Normal  103.62 KiB      68.67%              -9188398778489235111                       
127.0.0.2  rack1       Up     Normal  69.9 KiB        67.27%              -9176942363448371450                       
127.0.0.3  rack1       Up     Normal  103.62 KiB      68.67%              -9165278195783829162                       
127.0.0.3  rack1       Up     Normal  103.62 KiB      68.67%              -9154226470359649220                       
127.0.0.3  rack1       Up     Normal  103.62 KiB      68.67%              -9105345187931701041                       
127.0.0.1  rack1       Up     Normal  88.93 KiB       64.06%              -9059861789902146091                       
127.0.0.1  rack1       Up     Normal  88.93 KiB       64.06%              -9006955999491952161                       
127.0.0.1  rack1       Up     Normal  88.93 KiB       64.06%              -8991976771179386553                       
127.0.0.1  rack1       Up     Normal  88.93 KiB       64.06%              -8974379714699893637                       
127.0.0.1  rack1       Up     Normal  88.93 KiB       64.06%              -8964003200974837827                       
127.0.0.2  rack1       Up     Normal  69.9 KiB        67.27%              -8885086536332616726                       
127.0.0.3  rack1       Up     Normal  103.62 KiB      68.67%              -8882333637674950203                       
127.0.0.2  rack1       Up     Normal  69.9 KiB        67.27%              -8866726432815074132     



13. :~/opensource/cassandra/ccm$ ccm stop

14. :~/opensource/cassandra/ccm$ ccm -h