Spark Sql Cassandra Example Java

How To Start with Apache Spark and Apache Cassandra

Apache Cassandra is a specific database that scales linearly. This has its price: specific table modelling, configurable consistency and limited analytics. Apple performs millions of operations per second on over 160,000 Cassandra instances while collecting over 100 PBs of data. You can bypass these limited analytics with the Apache Spark and the DataStax connector, and that's what the story is about.

Setup

I've used one Apache Cassandra node on Docker

          version: '3'

services:
cassandra:
image: cassandra:latest
ports:
- "9042:9042"

Apache Spark 3.0 is launched as shell with connector and Cassandra's client library, which will be useful for timeuuid type conversion.

          ./spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.0-beta,com.datastax.cassandra:cassandra-driver-core:3.9.0        

If Cassandra is not running locally, you need to configure its address.

          spark.conf.set("spark.cassandra.connection.host", "127.0.0.1")        

Data

To test the Spark + Cassandra combination, I generated some date using mockaroo.com. It's a list of sensors and a list of measurements from those sensors. You can find them in the repository on GitHub.

          maciej@ubuntu:~/Desktop/spark_and_cassandra$ head sensor_reads.csv            
date,sensor_id,temperature,wind_speed,wind_direction
2020-02-20 13:00:57,11,90.42,72.91,153
2020-05-28 21:31:03,9,51.62,20.07,255
2020-06-04 16:32:02,3,6.68,89.31,309
...

Creating tables in Apache Cassandra

Apache Cassandra has a dedicated tool called cqlsh. In case of Docker, you have to use the following command.

          sudo                         docker exec                         -it container_name cqlsh        

First we have to create a keyspace, which is a bag where our tables will be in.

                      CREATE                        KEYSPACE spark_playground            WITH                        replication = {'class': 'SimpleStrategy', 'replication_factor': 1 };        

Information about the sensors will be kept in the sensors table. Nothing unusual: id, location, group id. Note that the primary key is id. So we won't do WHERE on the other columns without a complete table scan (did I mention specific data modeling?), but filtering by key will be super fast.

                      CREATE            TABLE                        sensors ( sensor_id            int, location text, group_id            int,            PRIMARY            KEY                        (sensor_id ));        

The measurements will be in the sensors_reads table. The key consists of the sensor id (partition key) and the date as a timeuuid (clustering key). The partition key indicates where the record is located (in which node). Timeuuid as a clustering key allows to sort the records. If you are lost, look at the explanation on Stack Overflow. You have to be carefull with selection of partition key. Wrong choice can lead to uneven load on the nodes.

                      CREATE            TABLE                        sensors_reads ( sensor_id            int,            date                        timeuuid,            temp            double, humidity            double, wind_speed            double, wind_direction            double,            PRIMARY            KEY                        (sensor_id,            date                        ));        

Adding data to Cassandra using Spark

Loading CSV files

          val sensors = spark.read.format("csv").option("header", "true").load("/home/maciej/Desktop/spark_and_cassandra/sensors.csv")
val sensorReads = spark.read.format("csv").option("header", "true").load("/home/maciej/Desktop/spark_and_cassandra/sensor_reads.csv")

Write to the sensor table

Both types and names of columns agree, so the operation is simple.

          sensors.write
.format("org.apache.spark.sql.cassandra")
.option("keyspace","spark_playground")
.option("table","sensors")
.mode("append")
.save()

Write to the sensors_reads table

It gets more complicated here:

  • Inconsistent column names in the temperature column. temperature => temp
  • Inconsistent types in the date column. We read a date as string from CSV, while the column type in Cassandra is timeuuid. We need to use Cassandra's client library to do the conversion.

The type inconsistency problem can be solved with the appropriate UDF (User Defined Function). By the way, we will check if the reverse function will return the same date. Normally I would write a test in IntelliJ, but let's do this in spark-shell way 😊.

          import spark.implicits._
import com.datastax.driver.core.utils.UUIDs
import org.apache.spark.sql.functions.udf

val toTimeuuid: java.sql.Timestamp => String = x => UUIDs.startOf(x.getTime()).toString()
val fromTimeuuid: String => java.sql.Timestamp = x => new java.sql.Timestamp(UUIDs.unixTimestamp(java.util.UUID.fromString(x)))

val toTimeuuidUDF = udf(toTimeuuid)
val fromTimeuuidUDF = udf(fromTimeuuid)

sensorsReads
.withColumn("date_as_timestamp", to_timestamp($"date"))
.withColumn("date_as_timeuuid", toTimeuuidUDF($"date_as_timestamp"))
.withColumn("timestamp_from_timeuuid",fromTimeuuidUDF($"date_as_timeuuid"))
.show(false)

Looks good.

Now we get rid of unnecessary columns, rename the temperature column and save the data in Cassandra

          val sensorsReads_fixed = sensorsReads
.withColumn("date_as_timestamp", to_timestamp($"date"))
.withColumn("date_as_timeuuid", toTimeuuidUDF($"date_as_timestamp"))
.drop("date").drop("date_as_timestamp")
.withColumnRenamed("date_as_timeuuid","date")
.withColumnRenamed("temperature","temp")
sensorsReads_fixed.write
.format("org.apache.spark.sql.cassandra")
.option("keyspace","spark_playground")
.option("table","sensors_reads")
.mode("append")
.save()

Read from Cassandra

It is possible to simplify references to tables in Cassandra. You just have to configure Spark Context.

          spark.conf.set("spark.sql.catalog.casscatalog","com.datastax.spark.connector.datasource.CassandraCatalog")        

Now we can refer to the tables using casscatalog.keyspace_name.table_name notation.

          scala> spark.read.table("casscatalog.spark_playground.sensors").show()
+---------+--------+--------------------+
|sensor_id|group_id| location|
+---------+--------+--------------------+
| 2| 4| 027 Heath Way|
| 13| 3| 42585 Ramsey Alley|
| 4| 1| 676 Marcy Point|
| 5| 3|260 Steensland Cr...|
| 9| 3|9385 Comanche Ter...|
| 8| 2|291 Meadow Ridge ...|
| 7| 2| 716 Randy Point|
| 14| 2| 331 Mcbride Road|
| 15| 3| 91 Gateway Hill|
| 1| 3| 66 Vera Avenue|
| 6| 4|87212 Lake View S...|
| 12| 2| 12 Montana Place|
| 10| 3| 60 Spohn Plaza|
| 11| 1| 48 Redwing Court|
| 3| 3| 930 Almo Way|
+---------+--------+--------------------+
val sensors_table = spark.read.table("casscatalog.spark_playground.sensors") val sensors_reads_table = spark.read.table("casscatalog.spark_playground.sensors_reads")

Cost of the operation

With Spark, everything seems easy and enjoyable.We have to remember that we operate on Cassandra, which has its own way of handling queries. I'm mainly concerned about the speed of retrieving records by their keys and their values.

          scala> sensors_table.filter("sensor_id in (1,2,3)").select("sensor_id","location").explain
20/07/21 11:09:50 INFO V2ScanRelationPushDown:
Pushing operators to sensors
Pushed Filters: In(sensor_id, [1,2,3])
Post-Scan Filters:
Output: sensor_id#749, location#751

== Physical Plan ==
*(1) Project [sensor_id#749, location#751]
+- BatchScan[sensor_id#749, location#751] Cassandra Scan: spark_playground.sensors
- Cassandra Filters: [["sensor_id" IN (?, ?, ?), 1]]
- Requested Columns: [sensor_id,location]

Filter by sensor_id happens at the stage of retrieving data from Cassandra. Filter by group_id requires scanning the entire table by Spark.

          scala> sensors_table.filter("group_id in (1,2,3)").select("sensor_id","location").explain
20/07/21 11:14:34 INFO V2ScanRelationPushDown:
Pushing operators to sensors
Pushed Filters:
Post-Scan Filters: group_id#750 IN (1,2,3)
Output: sensor_id#749, group_id#750, location#751

== Physical Plan ==
*(1) Project [sensor_id#749, location#751]
+- *(1) Filter group_id#750 IN (1,2,3)
+- BatchScan[sensor_id#749, group_id#750, location#751] Cassandra Scan: spark_playground.sensors
- Cassandra Filters: []
- Requested Columns: [sensor_id,group_id,location]

Simple aggregations

Let's assume that there was a need to count the number of sensors in groups. We did not predict this at the level of database design and the CQL query will not be executed.

          cqlsh:spark_playground> SELECT group_id, count(1) FROM sensors GROUP BY group_id;
InvalidRequest: Error from server: code=2200 [Invalid query] message="Group by is currently only supported on the columns of the PRIMARY KEY, got group_id"

So either we do it on the client application side or we use the Apache Spark.

          scala> sensors_table.groupBy("group_id").count.show
+--------+-----+
|group_id|count|
+--------+-----+
| 1| 2|
| 3| 7|
| 4| 2|
| 2| 4|
+--------+-----+

Joins — joinWithCassandraTable

The easiest way to join is to take two sets and make a Cartesian product. However, the Cassandra connector provides a faster solution. In the RDD version there was a joinWithCassandraTable method, while in the DataFrame there is Direct Join, whose documentation, as you can see, is poor. joinWithCassandraTable performs one query for each partition required by the source RDD. In case of DataFrame this happens automatically.

          scala> sensors_reads_table.join(sensors_table).explain
...
== Physical Plan ==
CartesianProduct
:- *(1) Project [date#755, sensor_id#756, humidity#757, temp#758, wind_direction#759, wind_speed#760]
: +- BatchScan[date#755, sensor_id#756, humidity#757, temp#758, wind_direction#759, wind_speed#760] Cassandra Scan: spark_playground.sensors_reads
- Cassandra Filters: []
- Requested Columns: [date,sensor_id,humidity,temp,wind_direction,wind_speed]
+- *(2) Project [sensor_id#749, group_id#750, location#751]
+- BatchScan[sensor_id#749, group_id#750, location#751] Cassandra Scan: spark_playground.sensors
- Cassandra Filters: []
- Requested Columns: [sensor_id,group_id,location]

Theoretically, the same operation, but the selection of the keyed joint generates a much more efficient execution plan.

          scala> sensors_reads_table.join(sensors_table, sensors_reads_table("sensor_id") === sensors_table("sensor_id"), "inner").explain
...
== Physical Plan ==
*(5) SortMergeJoin [sensor_id#756], [sensor_id#749], Inner
:- *(2) Sort [sensor_id#756 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(sensor_id#756, 200), true, [id=#812]
: +- *(1) Project [date#755, sensor_id#756, humidity#757, temp#758, wind_direction#759, wind_speed#760]
: +- BatchScan[date#755, sensor_id#756, humidity#757, temp#758, wind_direction#759, wind_speed#760] Cassandra Scan: spark_playground.sensors_reads
- Cassandra Filters: []
- Requested Columns: [date,sensor_id,humidity,temp,wind_direction,wind_speed]
+- *(4) Sort [sensor_id#749 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(sensor_id#749, 200), true, [id=#820]
+- *(3) Project [sensor_id#749, group_id#750, location#751]
+- BatchScan[sensor_id#749, group_id#750, location#751] Cassandra Scan: spark_playground.sensors
- Cassandra Filters: []
- Requested Columns: [sensor_id,group_id,location]

EDIT (30.07.2020)

Alex Ott from Datastax, told me in the comments on Linkedin that there is no Direct Join in this case. It turns out that you need to configure spark.sql.extensions. More details can be found here.

          spark.conf.set("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")        

Repository

https://github.com/zorteran/wiadro-danych-spark-cassandra-101 — docker-compose and CSVs

Summary

The topic of cooperation between Spark and Cassandra is barely mentioned in this entry.Cassandra is an interesting alternative and/or complement to the Hadoop ecosystem. We can use Spark for analysis, but also for maintaining and integrating data from and to Cassandra. After all, it is hard to find the ideal data model in the first approach 😉.

santiagoupostaing.blogspot.com

Source: https://itnext.io/how-to-start-with-apache-spark-and-apache-cassandra-886a648bd2fb

0 Response to "Spark Sql Cassandra Example Java"

Post a Comment

Iklan Atas Artikel

Iklan Tengah Artikel 1

Iklan Tengah Artikel 2

Iklan Bawah Artikel