Data linkage with Spark and Cassandra

Introduction

In data analysis, there are often options to implement in combination with Apache Spark + Cassandra.

What is Apache Spark?

Apache Spark is a very famous data analysis tool. image.png

Access data in HDFS, Alluxio, Apache Cassandra, Apache HBase, Apache Hive, and hundreds of other data sources.

RDD (Resilient Distributed Dataset), DataFrame, DataSet, etc. .. ..

Source: https://spark.apache.org/

What is Cassandra

Cassandra is a NoSQL wide column database. image.png

Manage massive amounts of data, fast, without losing sleep

Source: http://cassandra.apache.org/

In particular, we have considered scalability from the beginning, so clustering is easy.

Sample to save CSV file data in Cassandra

Spark has various functions, but let's create a sample to save CSV in Cassandra.

Create a sample file called users.csv

image.png

Introducing library to Gradle project

build.gradle


dependencies {
	// https://mvnrepository.com/artifact/org.scala-lang/scala-library
	compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.12'

	// https://mvnrepository.com/artifact/org.apache.spark/spark-core
	compile group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.3.4'

	// https://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector
	compile group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.11', version: '2.4.1'
	
	// https://mvnrepository.com/artifact/org.apache.spark/spark-sql
	compile group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.3.4'

	// https://mvnrepository.com/artifact/org.apache.spark/spark-streaming
	compile group: 'org.apache.spark', name: 'spark-streaming_2.11', version: '2.3.4'

}

Save it from CSV to Cassandra and try to get it from DB.

CsvReader.java


package com.test.spark;

import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;

import java.util.List;

import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

public class CsvReader {

	private static final Logger logger = Logger.getLogger(CsvReader.class);

	public static void main(String[] args) {

		//Spark settings
		SparkConf conf = new SparkConf();
		conf.setAppName("CSVReader");
		conf.setMaster("local[*]");
		conf.set("spark.cassandra.connection.host", "192.168.10.248");
		conf.set("spark.cassandra.connection.port", "9042");

		//Cassandra keyspace and table name
		String keyspace = "sample";
		String tableUser = "user";
		String userCsv = "C:\\data\\spark\\users.csv";

		JavaSparkContext sc = new JavaSparkContext(conf);
		try {
			SparkSession sparkSession = SparkSession.builder().master("local").appName("CSVReader")
					.config("spark.sql.warehouse.dir", "file:////C:/data/spark").getOrCreate();

			//Cassandra connection
			CassandraConnector connector = CassandraConnector.apply(sc.getConf());
			
			try (Session session = connector.openSession()) {
				//Delete keyspace if it exists
				session.execute("DROP KEYSPACE IF EXISTS " + keyspace);
				
				//Create keyspace
				session.execute("CREATE KEYSPACE " + keyspace
						+ " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
				
				//Create a table
				session.execute("CREATE TABLE " + keyspace + "." + tableUser
						+ "(user_id TEXT PRIMARY KEY, user_name TEXT, email_address TEXT, memo TEXT)");
			}

			//Get data from CSV
			//Column AS is also important to match the table definition
			Dataset<Row> csv = sparkSession.read().format("com.databricks.spark.csv").option("header", "true")
					.option("encoding", "UTF-8").load(userCsv).select(new Column("User ID").as("user_id"),
							new Column("Full name").as("user_name"), 
							new Column("mail address").as("email_address"),
							new Column("Remarks").as("memo"));

			//Save to Cassandra
			csv.write().format("org.apache.spark.sql.cassandra")
					.option("header", "true")
					.option("keyspace", keyspace)
					.option("table", tableUser)
					.option("column", "user_id")
					.option("column", "user_name")
					.option("column", "email_address")
					.option("column", "memo")
					.mode(SaveMode.Append)
					.save();

			//Read data from Cassandra
			Dataset<Row> dataset = sparkSession.read().format("org.apache.spark.sql.cassandra")
					.option("keyspace", keyspace)
					.option("table", tableUser).load();
			
			//Get an array from a dataset
			List<Row> asList = dataset.collectAsList();
			for (Row r : asList) {
				logger.info(r);
			}
		} catch (Exception e) {
			logger.error(e);
		} finally {
			sc.stop();
			sc.close();
		}
	}
}

Cassandra data

image.png

User data acquired by JAVA

19/10/11 23:18:27 INFO CsvReader: [A000002,[email protected],10 years after joining the company,Saburo Yamada]
19/10/11 23:18:27 INFO CsvReader: [A000004,[email protected],3rd year after joining the company,Jiro Tanaka]
19/10/11 23:18:27 INFO CsvReader: [A000003,[email protected],5th year after joining the company,Ichiro Tanaka]
19/10/11 23:18:27 INFO CsvReader: [A000001,[email protected],1st year after joining the company,Yamada Taro]

Detailed materials such as basic operations can be found in the guide. Spark Programming Guide: https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html

that's all

Recommended Posts

Data linkage with Spark and Cassandra
Compatibility of Spring JDBC and MyBatis with Spring Data JDBC (provisional)
Import OSM data into PostGIS and visualize with QGIS
[Machine learning with Apache Spark] Sparse Vector (sparse vector) and Dense Vector (dense vector)
Window aggregation of sensor data with Apache Flink and Java 8
Easy microservices with Spark Framework!
Basic data type and reference type
Java programming (variables and data)
Rails Posts and User Linkage
URLSession with URLSession and Combine normally
Run logstash with Docker and try uploading data to Elastic Cloud
Creating REST APIs with Spring JPA Data with REST and Lombok incredibly easy.