In here, I am going to show how to integrate local single node Cassandra db with standalone spark using spark-cassandra-connector.
Setup Cassandra, Spark, Scala & ScalaBuildTool
1. Download Cassandra & Spark. I am using Cassandra version 3.11.2 and Spark version 2.2.1 .
http://cassandra.apache.org/download/
https://spark.apache.org/releases/spark-release-2-2-1.html
https://www.scala-lang.org/download/2.11.8.html
https://www.scala-sbt.org/download.html
2. Environment setup in .profile
#cassandra setup
export CASSANDRA_HOME=/home/dhanuka/software/apache-cassandra-3.11.2
#spark, sbt and scala setup
export SPARK_HOME=/home/dhanuka/software/spark/spark-2.2.1-bin-hadoop2.7
export SBT_HOME=/home/dhanuka/software/spark/sbt-launcher-packaging-0.13.13
export SCALA_HOME=/home/dhanuka/software/scala-2.11.8
PATH=$PATH:$JAVA_HOME/bin:$MAVEN_HOME/bin:SPARK_HOME/bin:$SBT_HOME/bin:$SCALA_HOME/bin:CASSANDRA_HOME/bin
3. $ source ~/.profile
Create Cassandra Keyspace and Table
1. Start cassandra with following command
$ cassandra -f
2. Start CQL shell
$ cqlsh
3. Create keyspace and a table
cqlsh> CREATE KEYSPACE people WITH replication = {'class': 'SimpleStrategy', 'replication_factor':1};
cqlsh> use people;
cqlsh:people> CREATE TABLE users(
... id varchar ,
... first_name varchar,
... last_name varchar,
... city varchar,
... emails varchar,
... PRIMARY KEY (id));
cqlsh:people> Insert into users (id,first_name,last_name,city,emails) values('1','dhanuka','ranasinghe','colombo','dhanuka.priyanath@gmail.com');
cqlsh:people> select * from users;
id | city | emails | first_name | last_name
---------+---------+-----------------------------+------------+------------
1 | colombo | dhanuka.priyanath@gmail.com | dhanuka | ranasinghe
Build spark-cassandra-connector.
1. clone from git hub repository.
$ git clone https://github.com/datastax/spark-cassandra-connector.git
$ cd spark-cassandra-connector
2. Build the project with scala 2.11 and cassandra 3.11.2
$ spark-cassandra-connector$ sbt -Dscala-2.11=true -Dtest.cassandra.version=3.11.2 assembly
You can find the jar location below.
$ spark-cassandra-connector/spark-cassandra-connector/target/full/scala-2.11/spark-cassandra-connector-assembly-2.0.7-82-g0369a7b.jar
$ mv spark-cassandra-connector-assembly-2.0.7-82-g0369a7b.jar spark-cassandra-connector-assembly-2.0.7.jar
Connect Spark with Cassandra through Spark-Shell
1. Copy cassandra-connector-assembly-2.0.7.jar to spark jars location. Copy to below location
$ cp cassandra-connector-assembly-2.0.7.jar $SPARK_HOME/jars
2. Start spark-shell
$ spark-shell --jars $SPARK_HOME/jars/spark-cassandra-connector-assembly-2.0.7.jar
3. Stop current spark context
scala> sc.stop
4. Program to read Cassandra from spark
scala> import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf
import com.datastax.spark.connector._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
scala> val conf = new SparkConf(true).set("spark.cassandra.connection.host", "localhost")
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@2c2a7d53
scala> val sc = new SparkContext(conf)
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@15914bb5
scala> val test_spark_rdd = sc.cassandraTable("people", "users")
test_spark_rdd: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:19
scala> test_spark_rdd.first
res1: com.datastax.spark.connector.CassandraRow = CassandraRow{id: 1, city: colombo, emails: dhanuka.priyanath@gmail.com, first_name: dhanuka, last_name: ranasinghe}
References:
[1] https://www.datastax.com/dev/blog/kindling-an-introduction-to-spark-with-cassandra-part-1
[2] https://www.youtube.com/watch?v=jpEABn80OCU
4/28/2018
Spark Cassandra Integration with spark-cassandra-connector
4/15/2018
Setup VNode Cassandra cluster in local machine
Here I am using Ubuntu Linux machine and also tool called Cassandra Cluster Manager (ccm).
Steps
1. :~/opensource$ mkdir cassandra
2. :~/opensource$ cd cassandra/
3. :~/opensource/cassandra$ git clone https://github.com/riptano/ccm.git
4. :~/opensource/cassandra$ cd ccm/
5. :~/opensource/cassandra/ccm$ ls
ccm ccmlib license.txt MANIFEST.in misc NETWORK_ALIASES.md README.md setup.py ssl tests
6. :~/opensource/cassandra/ccm$ sudo ./setup.py install
7. :~/opensource/cassandra/ccm$ ccm create -v 3.11.2 -n 3 my_cluster --vnodes
10:22:00,960 ccm INFO Downloading http://archive.apache.org/dist/cassandra/3.11.2/apache-cassandra-3.11.2-bin.tar.gz to /tmp/ccm-Wo1DBu.tar.gz (36.656MB)
38436262 [100.00%]10:24:09,353 ccm INFO Extracting /tmp/ccm-Wo1DBu.tar.gz as version 3.11.2 ...
Current cluster is now: my_cluster
cluster name: my_cluster
cassandra version: 3.11.2
number of nodes: 3
8. :~/opensource/cassandra/ccm$ ccm list
*my_cluster
9. :~/opensource/cassandra/ccm$ ccm status
Cluster: 'my_cluster'
---------------------
node1: DOWN (Not initialized)
node3: DOWN (Not initialized)
node2: DOWN (Not initialized)
10. :~/opensource/cassandra/ccm$ ccm start
:~/opensource/cassandra/ccm$ ccm status
Cluster: 'my_cluster'
---------------------
node1: UP
node3: UP
node2: UP
11. :~/opensource/cassandra/ccm$ ccm node1 status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- Address Load Tokens Owns (effective) Host ID Rack
UN 127.0.0.1 88.93 KiB 256 64.1% 5e9d0ad1-d7d0-4023-b71c-2f57375c2573 rack1
UN 127.0.0.2 69.9 KiB 256 67.3% e8ea9b63-7846-4494-bd0d-2c8dfcb9274b rack1
UN 127.0.0.3 103.62 KiB 256 68.7% 301887a5-b386-47c6-8a59-3f71ea09b9fc rack1
12. :~/opensource/cassandra/ccm$ ccm node1 ring
Datacenter: datacenter1
==========
Address Rack Status State Load Owns Token
9184122387525172037
127.0.0.3 rack1 Up Normal 103.62 KiB 68.67% -9188398778489235111
127.0.0.2 rack1 Up Normal 69.9 KiB 67.27% -9176942363448371450
127.0.0.3 rack1 Up Normal 103.62 KiB 68.67% -9165278195783829162
127.0.0.3 rack1 Up Normal 103.62 KiB 68.67% -9154226470359649220
127.0.0.3 rack1 Up Normal 103.62 KiB 68.67% -9105345187931701041
127.0.0.1 rack1 Up Normal 88.93 KiB 64.06% -9059861789902146091
127.0.0.1 rack1 Up Normal 88.93 KiB 64.06% -9006955999491952161
127.0.0.1 rack1 Up Normal 88.93 KiB 64.06% -8991976771179386553
127.0.0.1 rack1 Up Normal 88.93 KiB 64.06% -8974379714699893637
127.0.0.1 rack1 Up Normal 88.93 KiB 64.06% -8964003200974837827
127.0.0.2 rack1 Up Normal 69.9 KiB 67.27% -8885086536332616726
127.0.0.3 rack1 Up Normal 103.62 KiB 68.67% -8882333637674950203
127.0.0.2 rack1 Up Normal 69.9 KiB 67.27% -8866726432815074132
13. :~/opensource/cassandra/ccm$ ccm stop
14. :~/opensource/cassandra/ccm$ ccm -h
Steps
1. :~/opensource$ mkdir cassandra
2. :~/opensource$ cd cassandra/
3. :~/opensource/cassandra$ git clone https://github.com/riptano/ccm.git
4. :~/opensource/cassandra$ cd ccm/
5. :~/opensource/cassandra/ccm$ ls
ccm ccmlib license.txt MANIFEST.in misc NETWORK_ALIASES.md README.md setup.py ssl tests
6. :~/opensource/cassandra/ccm$ sudo ./setup.py install
7. :~/opensource/cassandra/ccm$ ccm create -v 3.11.2 -n 3 my_cluster --vnodes
10:22:00,960 ccm INFO Downloading http://archive.apache.org/dist/cassandra/3.11.2/apache-cassandra-3.11.2-bin.tar.gz to /tmp/ccm-Wo1DBu.tar.gz (36.656MB)
38436262 [100.00%]10:24:09,353 ccm INFO Extracting /tmp/ccm-Wo1DBu.tar.gz as version 3.11.2 ...
Current cluster is now: my_cluster
cluster name: my_cluster
cassandra version: 3.11.2
number of nodes: 3
8. :~/opensource/cassandra/ccm$ ccm list
*my_cluster
9. :~/opensource/cassandra/ccm$ ccm status
Cluster: 'my_cluster'
---------------------
node1: DOWN (Not initialized)
node3: DOWN (Not initialized)
node2: DOWN (Not initialized)
10. :~/opensource/cassandra/ccm$ ccm start
:~/opensource/cassandra/ccm$ ccm status
Cluster: 'my_cluster'
---------------------
node1: UP
node3: UP
node2: UP
11. :~/opensource/cassandra/ccm$ ccm node1 status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- Address Load Tokens Owns (effective) Host ID Rack
UN 127.0.0.1 88.93 KiB 256 64.1% 5e9d0ad1-d7d0-4023-b71c-2f57375c2573 rack1
UN 127.0.0.2 69.9 KiB 256 67.3% e8ea9b63-7846-4494-bd0d-2c8dfcb9274b rack1
UN 127.0.0.3 103.62 KiB 256 68.7% 301887a5-b386-47c6-8a59-3f71ea09b9fc rack1
12. :~/opensource/cassandra/ccm$ ccm node1 ring
Datacenter: datacenter1
==========
Address Rack Status State Load Owns Token
9184122387525172037
127.0.0.3 rack1 Up Normal 103.62 KiB 68.67% -9188398778489235111
127.0.0.2 rack1 Up Normal 69.9 KiB 67.27% -9176942363448371450
127.0.0.3 rack1 Up Normal 103.62 KiB 68.67% -9165278195783829162
127.0.0.3 rack1 Up Normal 103.62 KiB 68.67% -9154226470359649220
127.0.0.3 rack1 Up Normal 103.62 KiB 68.67% -9105345187931701041
127.0.0.1 rack1 Up Normal 88.93 KiB 64.06% -9059861789902146091
127.0.0.1 rack1 Up Normal 88.93 KiB 64.06% -9006955999491952161
127.0.0.1 rack1 Up Normal 88.93 KiB 64.06% -8991976771179386553
127.0.0.1 rack1 Up Normal 88.93 KiB 64.06% -8974379714699893637
127.0.0.1 rack1 Up Normal 88.93 KiB 64.06% -8964003200974837827
127.0.0.2 rack1 Up Normal 69.9 KiB 67.27% -8885086536332616726
127.0.0.3 rack1 Up Normal 103.62 KiB 68.67% -8882333637674950203
127.0.0.2 rack1 Up Normal 69.9 KiB 67.27% -8866726432815074132
13. :~/opensource/cassandra/ccm$ ccm stop
14. :~/opensource/cassandra/ccm$ ccm -h
Subscribe to:
Posts (Atom)