How to write Big Data application with Java and Spark

Spark is modern Big Data framework to build highly scalable and feature rich data transformation pipelines.

Spark's main advantages are simplicity and high performance compared to its predecessor - Hadoop. You can write Spark applications in main 3 languages: Scala, Java and Python.

In this guide I will show you how to write simple Spark application in Java.

Writing Spark application

To create Spark job, as a first step you will need to add Spark library dependency into your maven project:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.4.7</version>
</dependency>

In this guide we will try to read and transform list of invoices provided in invoice.json file:

[
  {
    "item": "iphone X",
    "price": 1000.00,
    "quantity": 2
  },
  {
    "item": "airpods",
    "price": 150.00,
    "quantity": 1
  },
  {
    "item": "macbook pro 13",
    "price": 1500.00,
    "quantity": 3
  }
]

Now we are ready to start writing application

import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.*;
import org.apache.spark.SparkConf;

public class SparkJob {
  public static void main(String[] args) {
    // setup spark session
    SparkConf conf = new SparkConf().setMaster("local[*]");
    SparkSession spark = SparkSession.builder()
        .config(conf)
        .getOrCreate();

    // read original json file
    Dataset<Row> dataframe = spark.read()
        .option("multiline", "true")
        .json("invoice.json");
    dataframe.show();
  }
}

Here we have created a SparkSession object which is entry point to build data jobs. And then created DataFrame, represented as java type Dataset<Row>.

DataFrame

DataFrame is distributed data structure representing underlying data similar to a table from relational database. Executing dataframe.show() generates following output:

+--------------+------+--------+
|          item| price|quantity|
+--------------+------+--------+
|      iphone X|1000.0|       2|
|       airpods| 150.0|       1|
|macbook pro 13|1500.0|       3|
+--------------+------+--------+

Beside data representation, DataFrame provides API methods to transform underlying data like following:

  • select
  • where
  • sort
  • groupBy
  • etc.

Now lets try to do some data transformations.

Task 1 - Filter items based on a price

Imagine that we need to build an invoice report which includes expsensive items only, with a price >= $1000. It can be implemented as following:

// filter items with a price greater than or equal to 1000
Dataset<Row> expensiveItems = dataframe
    .filter("price >= 1000");
expensiveItems.show();

+--------------+------+--------+
|          item| price|quantity|
+--------------+------+--------+
|      iphone X|1000.0|       2|
|macbook pro 13|1500.0|       3|
+--------------+------+--------+

Alternatively this code could be improved by applying statically typed functions:

import static org.apache.spark.sql.functions.*;

Dataset<Row> expensiveItems = dataframe
        .filter(col("price").geq(1000));

Here we define column price and apply function geq (Greater than or equal to an expression) with a parameter 1000. It provides exactly same result but provides little bit less space to make a typo.

Task 2 - Calculate total price for each item

Having 2 columns item price and item quantity we can calculate a total amount for that item as following:

// multiply item price by item quantity in invoice
Dataset<Row> sumPerRow = dataframe
    .select(
        col("*"),
        expr("price * quantity").as("sum_per_row"));
sumPerRow.show();
    
+--------------+------+--------+-----------+
|          item| price|quantity|sum_per_row|
+--------------+------+--------+-----------+
|      iphone X|1000.0|       2|     2000.0|
|       airpods| 150.0|       1|      150.0|
|macbook pro 13|1500.0|       3|     4500.0|
+--------------+------+--------+-----------+

And statically typed alternative:

Dataset<Row> sumPerRow = dataframe
    .select(
        col("*"),
        col("price").multiply(col("quantity")).as("sum_per_row"));

Task 3 - Calculate total amount for all items in invoice

As a last task lets aggregate all rows and calculate total invoice amount:

// calculate total amount of all items
Dataset<Row> total = dataframe
    .select(sum(expr("price * quantity")).as("total"));
total.show();
    
+------+
| total|
+------+
|6650.0|
+------+

Summary

We have written java application with a spark library and created simple data transformations jobs. You can run application locally and see results in place.

The great power of Spark is that you can deploy exactly same application to cluster of multiple nodes and scale performance of your application as much as needed to process arbitrary amount of data.