In the previous post we have created simple Spark job and executed it locally on a single machine. Nevertheless the main goal of Spark framework is to utilize cluster resources consisting of multiple servers and in this way increase data processing throughput. In the real life the amount of data processed by production grade cluster is estimated at-least in terabytes.
The big advantage of Spark application is that it is ready for distributed cluster execution by design. Still there are few details which should be taken in account assuming distributed nature of application.
Lets revisit our application and highlight 3 main concerns which should be addressed to execute our job on Spark cluster:
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import static org.apache.spark.sql.functions.*;
public class ClusterDeployment {
public static void main(String[] args) {
// TODO Concern 1: how to setup master?
SparkConf conf = new SparkConf().setMaster("local[*]");
SparkSession spark = SparkSession.builder()
.config(conf)
.getOrCreate();
Dataset<Row> dataframe = spark.read()
.option("multiline", "true")
// TODO Concern 2: how to read file?
.json("invoice.json")
.select(
col("item"),
expr("price * quantity").as("item_total"));
// TODO Concern 3: what to do with output?
dataframe.show();
}
}
Concern 1 - Setup master server
In our sample we have set master as local[*]
which means - run job locally using all available CPUs. For cluster deployment setMaster
statement shoud be omitted as hardware resources will be automatically managed by cluster environment.
Concern 2 - How to read source file?
Our naive test used local file invoice.json
as datasource. But as you already now real big data application is expected to read files which could be too large to be stored on your local hard disk drive.
To store such a big files you need a very special file system - HDFS (hadoop distributed file system). It is deployed on top of multiple servers and stores data in a distributed way. Alternatively you can also use Amazon S3 storage, which is very similar to HDFS but hosted in cloud.
In our case what matters is to set proper path to file system where actual data is stored, e.g:
File system | File path sample |
---|---|
HDFS | hdfs://namenode:8020/user/data/invoice.json |
Amazon S3 | s3a://aws-account-id/user/data/invoice.json |
Spark production grade cluster supports HDFS and Amazon S3 by default, so setting correct path is all you need.
Concern 3 - What to do with output?
As for our test app, we used show()
method to print first 20 rows of dataframe to standard output. Thats good for development purpose, but for real application we need something more applicable.
There are 2 main options to consider collectAsList
or write
underlying data.
Method | Description |
---|---|
collectAsList | Returns a Java list that contains all rows in this Dataset. Running collect requires moving all the data into the application's driver process, and doing so on a very large dataset can crash the driver process with OutOfMemoryError. |
write | Interface for saving the content of the non-streaming Dataset out into external storage. |
One important aspect of collectAsList
is that it will return all data to your local JVM instance. Assuming dataframe might contain gigabytes of data, executing this operation could crush your application with OutOfMemory exception. Still this operation could be safe if you do heavy filtering and assure that underlying result set is small enough to be processed in a single JVM.
Nevertheless most of time you would want to write results of data transformation to persistent storage. For example you can write result set back to HDFS in JSON format as following:
dataframe.write()
.json("hdfs:///user/data/invoice_report");
So here is the final version of our application with all concerns adressed:
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.*;
import static org.apache.spark.sql.functions.*;
public class ClusterDeployment {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
SparkSession spark = SparkSession.builder()
.config(conf)
.getOrCreate();
Configuration hconf = spark.sparkContext().hadoopConfiguration();
hconf.set("fs.defaultFS", "hdfs://namenode:8020");
hconf.set("dfs.client.use.datanode.hostname", "true");
Dataset<Row> dataframe = spark.read()
.option("multiline", "true")
.json("hdfs:///user/data/invoice.json")
.select(
col("item"),
expr("price * quantity").as("item_total"));
dataframe.write().json("hdfs:///user/data/invoice_report");
}
}
Deployment
To test cluster deployment, you will need real Spark cluster up and running.
Once your application is ready its time to build a jar file and deploy it to cluster:
~/spark/bin/spark-submit \
--master spark://spark-master:6066 \
--deploy-mode client \
--class ClusterDeployment \
./java-spark-job-0.1-SNAPSHOT.jar
Lets go through the most important parameters one by one:
--master
is a master URL of the Spark cluster--deploy-mode
defines an option where spark driver is hosted:client
means the driver is hosted on a same machine where spark-submit executed,cluster
deploys driver to one of cluster workers--class
entry point class of a spark job- the last parameter should be path to the application jar file
Summary
We have prepared our application to process data in production environment and successfully deployed it to the Spark cluster.