User Tools

Site Tools


Parallel Processing with Spark on GWDG’s Scientific Compute Cluster

Apache Spark

Apache Spark is a cluster computing framework for processing large data sets, based on Scala, an object oriented functional programming language. Parallel processing in Spark uses the Resilient Distributed Database, RDD. Within an RDD, a given data set is decomposed into a number of partitions, which then are processed in parallel on the available resources in the cluster. The attribute resilient refers to the ability to recover in case of an error occurring for an individual partition and to reprocess this work on a different resource in the cluster.

The following sections will describe the Spark environment on GWDG’s compute cluster and the use of Spark’s RDD’s for parallel processing. The Scala programming language will be explained only so far as needed for the examples. Programming guides covering all of Scala are e.g. the online books Leanring Scala by Jason Swartz and Programming in Scala by Martin Odersky, Lex Spoon and Bill Venners.

The Interactive Spark Environment

GWDG provides version 2.3.1 of the Apache Spark framework. On all frontend nodes for the cluster (gwdu101, gwdu102, gwdu103) the Spark environment is loaded by the module command:

module load JAVA/jdk1.8.0_31 spark

(Because spark depends on JAVA 8, the module JAVA/jdk1.8.0_31 has to be loaded before the spark module.)

Spark provides an interactive Spark Shell for executing single Spark commands or scripts with collections of Spark commands. This shell can be loaded locally on a frontend node by executing the command


which produces after a number of comment lines the prompt


Spark expressions entered in this shell will be evaluated by using as many cores of the node in parallel as needed and the result will be returned on the screen.

Introducing the Spark Shell

The interactive Spark Shell is an extension of the **Scala REPL** (Read-Execute-Print-Loop) shell. Any input to the shell completed by the return key will be executed and will be followed by a printout of the expressions result, if the input was a valid Spark expression, or else by an error message. The available commands in this shell are listed by executing :help

scala> :help
All commands can be abbreviated, e.g., :he instead of :help.
:edit <id>|<line>        edit historyreset
:help [command]          print this summary or command-specific help
:history [num]           show the history (optional num is commands to show)
:h? <string>             search the history
:imports [name name ...] show import history, identifying sources of names
:implicits [-v]          show the implicits in scope
:javap <path|class>      disassemble a file or class name
:line <id>|<line>        place line(s) at the end of history
:load <path>             interpret lines in a file
:paste [-raw] [path]     enter paste mode or paste a file
:power                   enable power user mode
:quit                    exit the interpreter
:replay [options]        reset the repl and replay all previous commands
:require <path>          add a jar to the classpath
:reset [options]         reset the repl to its initial state, forgetting all session entries
:save <path>             save replayable session to a file
:sh <command line>       run a shell command (result is implicitly => List[String])
:settings <options>      update compiler options, if possible; see reset
:silent                  disable/enable automatic printing of results
:type [-v] <expr>        display the type of an expression without evaluating it
:kind [-v] <expr>        display the kind of expression's type
:warnings                show the suppressed warnings from the most recent line which had any

To close the Spark shell, you press Ctrl+D or type in :q (or any subset of :quit) Other commands will be desribed in turn when they will be used in the context of specific examples.

Apache Spark is based on the object oriented functional programming language Scala. Basic elements of the language are classes, which contain fields (values of specific types), and methods (functions applied to the values in the fields) and possibly input parameters. The syntax is

class <identifier> ([val|var] <identifier>: <type>[, ... ]) [{ fields and methods }]

The following simple example shows how to define a class in the Spark Shell. The second line is the message from Spark Shell indicating the succesfull definition of a class:

scala> class User (n: String = “none”) { val name = n; def greet =  s"Hello from $name" } 
defined class User 

The class User is defined with a field name and a method greet, and the field name has the default value “none”.

Instances of a class can be generated by expressions using the keyword new. Appending the identifiers with .name gives the value of its field name, with .greet results in applying its method greet:

scala> val userdefault = new User
userdefault: User = User@22495f2f
res0: String = none
scala> userdefault.greet
res1: String = Hello from none

scala> val userhaan = new User("haan")
userhaan: User = User@2ded3code1cc
res2: String = haan
scala> userhaan.greet
res3: String = Hello from haan 

Every value defined by executing an expression in the Spark Shell is stored and available for later use, as are the results res[x] returned from executing some expression. The shell can be cleared by executing the command : reset, which removes all previous entries in the shell.

The Spark Shell can execute commands from a script file using the command :load <scriptfilename>. As an example, create a file User.script in the directory, from which the Spark Shell was opened:

class User( n: String = "none") {
        val name = n
        def greet = s"Hello from $name"
val userdefault = new User
val userhaan = new User("haan")

Loading this script in the Spark Shell gives the following result:

scala> :load User.script
Loading User.script...
defined class User
userdefault: User = User@60a6564c
res0: String = none
res1: String = Hello from none
userhaan: User = User@41cfcbb5
res2: String = haan
res3: String = Hello from haan

Executing System Commands from the Spark Shell

The Spark Shell command :sh allows to execute system commands and to display their results. E.g. the content of the directory from which the Spark Shell was started can be examined in the following way

scala> :sh ls -l
res0: = `ls -l` (29 lines, exit 0)

scala> res0.lines foreach println
total 2709248
-rw------- 1 ohaan GGST        727 Jan 28 12:07 derby.log
drwx------ 2 ohaan GGST          0 Aug 31 17:06 files

Unfortunately, wild cards or redirections are not handled by :sh, a command like :sh ls core.* will result in an error.

An alternative posibility for executing system commands is the use of Scala's ! or !! methods (see e.g. this link). These are available after importing the sys.process._ package of Scala. The ! methods executes the content of a string as a command, displays the result of the system command as output in the Spark Shell and returns the return value of the system command. The !! method returns the output from the system command in a string. As an example the execution of the ls -l command is shown:

scala> import sys.process._
import sys.process._

scala> val content = "ls -l".!
total 864
-rw------- 1 ohaan GGST    727 Jan 28 12:07 derby.log
drwx------ 2 ohaan GGST      0 Aug 31 17:06 files
content: Int = 0

scala> println(content)

scala> val content_string = "ls -l".!!
content_string: String =
"total 864
-rw------- 1 ohaan GGST    727 Jan 28 12:07 derby.log
drwx------ 2 ohaan GGST      0 Aug 31 17:06 files
scala> println(content_string)
total 864
-rw------- 1 ohaan GGST    727 Jan 28 12:07 derby.log
drwx------ 2 ohaan GGST      0 Aug 31 17:06 files

For system shell commands using wildcards, the input string has to be evaluated by a system shell, which can be realized by applying the ! and !! methods to the string list Seq(“/bin/sh”, “-c”, “command-string”) (see Scalacookbook, Recipe 12.17) :

scala> Seq("/bin/sh","-c","ls *.script").!
res68: Int = 0

Creating Spark RDDs

The main entry point for Spark functionality is the class SparkContext. (See documentation for Apache Spark 2.3.1 following API Docs->Scala->org.apache.spark->SparkContext). This class represents the connection to a Spark cluster and it provides the methods to create RDDs, to process data within the partitions of a RDD and to communicate data between the different partitions of a RDD.

Invoking the Spark Shell automatically generates the instance sc of the SparkContext class, which is used to access all of Sparks functionalities from the Spark Shell.

There are three ways to create RDDs:

  • copying existing Scala collections
  • reading data from external storage
  • creating new RDDs from existing RDDs

Copying Collections

An existing Scala collection is transformed into an RDD with the method parallelize. This method needs as the first argument the collection to be transformed into a RDD. As an optional second argument the number of partitions of the RDD to be created can be prescribed. If no second argument is provided, a system dependend choice for this number will be made. The following example in the Spark Shell shows how to generate a RDD with 3 partitioins containing the integers 1 to 10 :

scala> val nums = 1 to 10
nums: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> val nums_RDD = sc.parallelize(nums,3)
nums_RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26

The first command creates the Scala collection nums of type scala.collection.immutable.Range.Inclusive, the second command uses the method parallelize for Spark Shell's instantiation sc of the class SparkContext to convert this collection into the RRD nums_RRD of type org.apache.spark.rdd.RDD[Int].

Since the expression 1 to 10 in Scala is a class instance, it can be inserted directly into the expression for creating the RDD:

scala> val nums_RDD = sc.parallelize(1 to 10,3)
nums_RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

Reading Data

SparkSession provides methods to read data in several formats from storage into RDDs. The method textFile will read an asci text as a collection of lines into a RDD. For example the file lines.txt

line1 : Hello
line2 : World
line3 : to
line4 : everyone

in the directory, from where the Spark Shell was started, will be loaded as follows:

scala> val lines_RDD = sc.textFile("lines.txt")
lines_RDD: org.apache.spark.rdd.RDD[String] = lines.txt MapPartitionsRDD[18] at textFile at <console>:24

The argument in the method textFile can be the path to a file, as in the previous example, or a path to a directory. In this case, the created RDD will have as many elements as the total number of lines of all files in he directory, each element containing one line.

Another method to create a RDD by reading text files is wholeTextFile, with a directory path as argument. This method creates a RDD with as many elements as files in the directory, each element containing the full pathname for a file, and the string representation of the whole content of the file.

Modifying Existing RDDs

The RDD class allows the application of various methods, which result in the creation of new instances of RDD. Some of them will be described in more detail later on. An example is the method map(s ⇒ s.length), which replaces each string in the RDD by its length:

scala> val linelength = => s.length)
linelength: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at <console>:25

Whereas lines_RDD is a RDD-collection of Strings - the lines of the input file lines.txt -, the newly created linelength is a RDD-collection of Ints - the number of characters in each line of the input file -.

Working with RDDs

Once a RDD is created, the methods defined for this class can be used to do work on its content in order to produce the required results. RDDs support two types of operations: transformations, which create one or more new RDDs from existing ones, and actions, which return values to the driver program after running a computation on the input RDD.

All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently.

Transformations are called narrow if they don't mix data from different partitions of the input RDD, otherwise they are called wide.

Because the partitions of a RDD can be distributed over a cluster of computing resources, wide transformations and actions may involve data communication over the clusters network, whereas narrow transformations can be executed locally.

A list of all methods available for RDDs is displayed in the Spark Shell by completing the identifyer for the RDD followed by a full-stop with the tab-key:

scala> nums_RDD. [tab-key]
++                    countByValue            groupBy                  name                 sampleStdev        toDF
aggregate             countByValueApprox      histogram                partitioner          sampleVariance     toDS
cache                 dependencies            id                       partitions           saveAsObjectFile   toDebugString
canEqual              distinct                intersection             persist              saveAsTextFile     toJavaRDD
cartesian             filter                  isCheckpointed           pipe                 setName            toLocalIterator
checkpoint            first                   isEmpty                  popStdev             sortBy             toString
coalesce              flatMap                 iterator                 popVariance          sparkContext       top
collect               fold                    keyBy                    preferredLocations   stats              treeAggregate
collectAsync          foreach                 localCheckpoint          productArity         stdev              treeReduce
compute               foreachAsync            map                      productElement       subtract           union
context               foreachPartition        mapPartitions            productIterator      sum                unpersist
copy                  foreachPartitionAsync   mapPartitionsWithIndex   productPrefix        sumApprox          variance
count                 getCheckpointFile       max                      randomSplit          take               zip
countApprox           getNumPartitions        mean                     reduce               takeAsync          zipPartitions
countApproxDistinct   getStorageLevel         meanApprox               repartition          takeOrdered        zipWithIndex
countAsync            glom                    min                      sample               takeSample         zipWithUniqueId

A description for the methods can be found in the documentation for the Spark RDD class

Displaying Properties of the RDD

Methods for displaying the size and content of the RDDs :

method description
count() returns the number of elements
first() returns the first element
take(num) returns an array with the first num elements
takeSample(withReplacement, num, [seed]) returns an array with a random sample of size num taken from the RDD, with or without replacement, optionally pre-specifying a random number generator seed.
collect() returns an array with all elements

Applying the method count to the previously defined RDDs returns their numbers of elements:

scala> val numscount =  nums_RDD.count()
numscount: Long = 10

scala> val linescount =  lines_RDD.count()
linescount: Long = 4

The count method for RDDs is similar to the size method for Scala collections and different from the count method for Scala collections. The method size is not available for RDDs:

scala> val numscount =  nums_RDD.size()
<console>:25: error: value size is not a member of org.apache.spark.rdd.RDD[Int]
       val numscount =  nums_RDD.size()

Selections of the content of a RDD can be accessed by the first, take, or takeSample methods:

scala> val content = lines_RDD.first()
content: String = line1 : Hello

scala> val content = nums_RDD.take(5)
content: Array[Int] = Array(1, 2, 3, 4, 5)

The takeSample method randomly selects a number of elements of the RDD. The sampling is done with or without replacement of the chosen elements depending on the value of the first argument in the method beeing true or false. With true the size of the sample can be arbitrary, even larger than the count of the RDD, and elements of the RDD can appear repeatedly, with false the size of the sample is limited to the count of the RDD and each element of the RDD can appear at most once.

scala> nums_RDD.takeSample(false,4)
res30: Array[Int] = Array(5, 10, 6, 8)

scala> nums_RDD.takeSample(false,4)
res31: Array[Int] = Array(8, 3, 2, 6)

scala> nums_RDD.takeSample(true,14)
res32: Array[Int] = Array(4, 6, 2, 8, 5, 5, 3, 3, 1, 1, 8, 9, 1, 6)

scala> nums_RDD.takeSample(true,14)
res33: Array[Int] = Array(9, 8, 3, 9, 5, 1, 10, 9, 10, 7, 2, 1, 3, 5)

The collect method returns the full content ot the RDD, it gives the same result as calling the take method with argument RDD_identifier.count.toInt.

scala> val content = lines_RDD.collect()
content: Array[String] = Array(line1 : Hello, line2 : World, line3 : to, line4 : everyone)
scala> val content = nums_RDD.collect()
content: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> val content = nums_RDD.take(nums_RDD.count.toInt)
content: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

The collect method tries to transfers the content of the whole RDD, which can be distributed over the main memories of several compute nodes to the main memory of the master node. For large RDDs this could lead to an out-of-memory exeption on the master node, therefore the collect method should not be used for large distributed RDDs.

Displaying Properties of RDD Partitions

Methods for displaying the number, sizes and contents of the partitions of the RDD are:

method description
getNumPartitions returns the number of partitions
mapPartitions(f) returns a new RDD by applying a function f: Iterator ⇒ Iterator to each partition
glom() returns a new RDD created by coalescing all elements within each partition into an array

An example for getting the number of partitions is

scala> val nums_RDD = sc.parallelize(1 to 50)
nums_RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at parallelize at <console>:24
scala> nums_RDD.collect
res5: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50)
scala> nums_RDD.getNumPartitions
res0: Int = 24

which shows that the method parallelize, invoked without the second parameter for prescibing the number of partitions created a RDD with 50 elements in 24 partitions. 24 is the number of cores avaible on the frontend gwdu103, which was used for these examples running Spark locally . It is the default for parallelize to split a collection into as many partitions as cores are available.

The next example shows the result if the second parameter is set:

scala> sc.parallelize(1 to 50,3).getNumPartitions
res22: Int = 3

The sizes of the individual partitions can be obtained using the mapPartitions method, which will be described in more detail in the next section. This method allows to access the elements of every partition separately by an iterator. A function using these iterators to perform operations on the elements of the partitions and storing the result into an output iterator must be given as argument for this method. Using the sizes of the input iterators as output iterators then produces a RDD with 24 elements, each element containing the number of elements of the 24 partitions of the original RDD. Applying the same method to the new RDD produces a second RDD with 24 Elements containing the the value 1.

scala> val part = nums_RDD.mapPartitions((x: Iterator[Int]) => Iterator(x.size))
part: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[29] at mapPartitions at <console>:25

scala> part.collect
res6: Array[Int] = Array(2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3)

scala> val partpart = part.mapPartitions(x => Iterator(x.size))
partpart: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[30] at mapPartitions at <console>:25

scala> partpart.collect
res7: Array[Int] = Array(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1)

The type assignment for the input iterator is redundant, because its type can be inferred from the type of elements in the RRD. This has been used in the expression for the value partpart.

The glom method allows to display the content of a RDD grouped by partitions. glom creates a RDD with as many partitions as partitions in the original RDD, each partition containing one element, the array of elements from the corresponding partition of the original RDD. This is shown applying glom to the RDD from the previuos example:

scala> nums_RDD.glom()
res2: org.apache.spark.rdd.RDD[Array[Int]] = MapPartitionsRDD[52] at glom at <console>:26

scala> nums_RDD.glom().getNumPartitions
res3: Int = 24

scala> nums_RDD.glom().collect()
res4: Array[Array[Int]] = Array(Array(1, 2), Array(3, 4), Array(5, 6), Array(7, 8), Array(9, 10), Array(11, 12), Array(13, 14), Array(15, 16), Array(17, 18), Array(19, 20), Array(21, 22), Array(23, 24, 25), Array(26, 27), Array(28, 29), Array(30, 31), Array(32, 33), Array(34, 35), Array(36, 37), Array(38, 39), Array(40, 41), Array(42, 43), Array(44, 45), Array(46, 47), Array(48, 49, 50))

scala> sc.parallelize(1 to 50,3).glom.getNumPartitions
res27: Int = 3

scala> sc.parallelize(1 to 50,3).glom.collect
res28: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16), Array(17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33), Array(34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50))

Saving RDDs to Disk

method description
saveAsTextFile(path) save the RDD as a text file, using string representations of elements.

This methods accepts a pathname to a directory, into which it saves the RDDs partitions in separate files, named part-00000, part-00001, etc. . This directory must not yet exist, it will be created by the method. The path may be absolut or relative to the working directory, from which the method is invoked.

As an example, the directory files has three elements:

gwdu103:35 17:55:00 ~/Spark_Intro > ls files
file1  file2  file3

Using textFile creates a RDD, whose properties can be examined:

val textfile_RDD = sc.textFile("files")
textfile_RDD: org.apache.spark.rdd.RDD[String] = files MapPartitionsRDD[102] at textFile at <console>:24

scala> textfile_RDD.count
res67: Long = 12

scala> textfile_RDD.getNumPartitions
res68: Int = 3

scala> textfile_RDD.glom().collect
res69: Array[Array[String]] = Array(Array(b1, b2, b3, b4, b5), Array(a1, a2, a3, a4), Array(c1, c2, c3))

textfile_RDD.saveAsTextFile(“newdir”) creates the directory newdir with three files:

gwdu103:35 18:07:29 ~/Spark_Intro > ls newdir
part-00000  part-00001  part-00002  _SUCCESS
gwdu103:35 18:08:28 ~/Spark_Intro > more newdir*

*** newdir: directory ***

gwdu103:35 18:08:57 ~/Spark_Intro > more newdir/*

Using instead the method wholeTextFiles:

scala> val wholetextfiles_RDD = sc.wholeTextFiles("files")
wholetextfiles_RDD: org.apache.spark.rdd.RDD[(String, String)] = files MapPartitionsRDD[111] at wholeTextFiles at <console>:24

scala> wholetextfiles_RDD.count
res77: Long = 3

scala> wholetextfiles_RDD.getNumPartitions
res78: Int = 2

scala> wholetextfiles_RDD.glom().collect
res79: Array[Array[(String, String)]] =
"), (file:/home/uni05/ohaan/Spark_Intro/files/file1,"a1
")), Array((file:/home/uni05/ohaan/Spark_Intro/files/file3,"c1

Saving wholetextfiles_RDD with wholetextfiles_RDD.saveAsTextFile(“newdir1”) produces the directory newdir1:

gwdu103:35 18:09:11 ~/Spark_Intro > ls newdir1
part-00000  part-00001  _SUCCESS
gwdu103:35 18:36:56 ~/Spark_Intro > more newdir1/*

More Transformations for RDDs

method description
map(f) returns a new RDD by applying a function f to each element
flatMap(f) returns a new RDD by first applying a function f to all elements, and then flattening the results
mapPartitions(f) returns a new RDD by applying a function f: Iterator ⇒ Iterator to each partition
mapPartitionsWithIndex(f)returns a new RDD by applying a function (Int, Iterator) ⇒ Iterator to each partition, while tracking the index of the original partition
filter(f)returns a new RDD containing only the elements that satisfy a predicate f: (T) ⇒ Boolean


The map method applies a function to every element of a RDD and generates a new RDD containing the results of the function evaluation. The function must have a single input and the new RDD will have the same number of elements as the old one. In the following example, a collection of integers from 1 to 10 is distributed into a RDD collection with 3 partitions. A function double is defined which doubles its input and applied to the RDD with the map method:

scala> val ints = sc.parallelize(1 to 10,3)
ints: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at <console>:27

scala> def double(x: Int): Int = 2*x
double: (x: Int)Int

scala> val ints_2 =
ints_2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[27] at map at <console>:30

scala> ints.collect
res32: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> ints_2.collect
res33: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

scala> ints.getNumPartitions
res34: Int = 3

scala> ints_2.getNumPartitions
res35: Int = 3

The output in the Spark Shell displays the expected result of applying the map method with the double function and the fact, that the number of partitions is equal in the old and new RDD.

The argument in the map method can be an identifier of a previously defined function as in the previous example. Also a function literal (anonymous function) can be used:

scala> val Int) => 2*x)
ints_2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:25

scala> ints_2.collect
res1: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

The placeholder syntax of scala can be used for specifying the function literal:

scala> val*2)


If the function, that transforms the elements of the RDD, returns a collection of several elements, the result of map will be a RDD containing as many collections as the number of elements in the original RDD:

scala> => List(2*x-1,2*x)).collect
res45: Array[List[Int]] = Array(List(1, 2), List(3, 4), List(5, 6), List(7, 8), List(9, 10), List(11, 12), List(13, 14), List(15, 16), List(17, 18), List(19, 20))

The flatMap method will “flatten” the RDD collection of generated collections into a RDD collection containing all elements of all generated collections:

scala> ints.flatMap(x => List(2*x-1,2*x)).collect
res46: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)

Applying the glom method to the transformed RDD shows that the RDD generated with flatMap has the same number of partitions as the original RDD, each partition containing all elements generated from the elements of the corresponding partition in the original RDD.

scala> ints.flatMap(x => List(2*x-1,2*x)).glom.collect
res47: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6), Array(7, 8, 9, 10, 11, 12), Array(13, 14, 15, 16, 17, 18, 19, 20))

mapPartitions, mapPartitionsWithIndex

The mapPartition method applies a function and returns the result separately to every partition of a RDD. The elements of the partition are fed into the function as an iterator and the result of the funtion has to be provided as an iterator, i.e. the function has to be defined as fun(x: Iterator[Type]): Iterator[Type] = . . .. As an example here is an application of the mapPartition method, which returns a RDD containing the number of elements in the original RDD's partitions:

scala> def ff(it: Iterator[Int]): Iterator[Int] = List(it.size).iterator
ff: (it: Iterator[Int])Iterator[Int]

scala> val ints = sc.parallelize(1 to 10,3)
ints: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[63] at parallelize at <console>:24

scala> ints.glom.collect
res127: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))
scala> ints.mapPartitions(ff).collect
res126: Array[Int] = Array(3, 3, 4)

The same result can be acchieved by giving the funtion argument for the mapPartitions method as a function literal

scala> ints.mapPartitions(it => List(it.size).iterator).collect
res129: Array[Int] = Array(3, 3, 4)

With the method mapPartitionsWithIndex, the transformation of the elements in a partition can be made dependend on the partition number, running from zero to number of partitions minus one. The function defining the transformation therefore has two input parameters, the partition index and the partition iterator. A function to be used as parameter in the mapPartitionsWithIndex method therefore has to be defined as fun(index: Int, x: Iterator[Type]): Iterator[Type] = . . .. In the following example, to every element of a partition the value 100*(index+1) is added:

scala> val ints = sc.parallelize(1 to 10,3)
ints: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24

scala> ints.collect
res5: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> def fun(index: Int, it: Iterator[Int]): Iterator[Int] = =>x + 100*(index+1)).iterator
fun: (index: Int, it: Iterator[Int])Iterator[Int]

scala> ints.mapPartitionsWithIndex(fun).collect
res6: Array[Int] = Array(101, 102, 103, 204, 205, 206, 307, 308, 309, 310)

scala> ints.mapPartitionsWithIndex((index,it) => =>x + 100*(index+1)).iterator).collect
res7: Array[Int] = Array(101, 102, 103, 204, 205, 206, 307, 308, 309, 310)

In the example the function defining the transformatioon is first provided by its identifier and then as a function literal.


With the filter method a new RDD is created containing all elements of the original RDD fulfilling a condition, which is specified in the function given as argument to the method. The function value must be of Boolean type, and all elements, for which the function returns true, are collected in the new RDD. The new RDD has the same number of partitions as the original one, and the retained values in the new RDD are placed in the partition with the same number as the partition they had occupied in the original RDD. Here are two examples for the filter method, one selecting all even values, the other selecting all values smaller than 5:

scala> val ints = sc.parallelize(1 to 10,3)
ints: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24

scala> ints.glom.collect
res16: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10))

scala> def fun(a: Int): Boolean = (a%2 == 0)
fun: (a: Int)Boolean

scala> ints.filter(fun).glom.collect
res17: Array[Array[Int]] = Array(Array(2), Array(4, 6), Array(8, 10))

scala> ints.filter(a => (a < 5)).glom.collect
res18: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4), Array())

Some Actions on RDDs

method description
reduce(func) Aggregate the elements of the dataset using a function func (which takes two arguments and returns one).
The function should be commutative and associative so that it can be computed correctly in parallel.
collect() Return all the elements of the dataset as an array at the driver program.
count() Return the number of elements in the dataset.
first() Return the first element of the dataset (similar to take(1)).
take(n) Return an array with the first n elements of the dataset.


The reduce method produces a single result from all elements of a RDD. The result depends on the operation, which is defined in the function passed as paramter for the reduce method. For example, the function literal (x,y)⇒ x+y will produce the sum of all elements in the RDD, the function literal (x,y)⇒ x max y the largest element in the RDD. Of course all elements in the RDD should have the same type, and the method + resp. max should be applicable to this type. Here are examples using the reduce method for a collection of Int values:

scala> val rdd1 = sc.parallelize(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[61] at parallelize at <console>:24

scala> rdd1.glom.collect
res146: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5), Array(6, 7, 8, 9, 10))

scala> rdd1.reduce((x,y)=>x+y)
res147: Int = 55

scala> rdd1.reduce((x,y)=>x.max(y))
res148: Int = 10

scala> rdd1.reduce((x,y)=>x max y)
res149: Int = 10

The last two commands express the same operation: The first shows, that max is a method applicable to objects of type Int, the second is an example of Scalas syntax rule for methods with only one input. In the same way the addition of two values 2 and 3 is realized by the method + applicable to objects od type Int and therefore the result of the additions is returned form the statement 2.+(3) . The above mentioned syntax rule allows to write this expression in the familiar form 2 + 3.

The next example shows the action of the reduce method with the + operation for a collection of string values. Of course the + operator for string valued data is not communicative as is required for a valid operation for the reduce method. But this example is useful later on for examining the reduction process for RDDs with multiple partitions.

scala> val chars = ('a' to 'j') => x.toString)
chars: List[String] = List(a, b, c, d, e, f, g, h, i, j)

scala> val rdd2 = sc.parallelize(chars,2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[65] at parallelize at <console>:26

scala> rdd2.glom.collect
res151: Array[Array[String]] = Array(Array(a, b, c, d, e), Array(f, g, h, i, j))

scala> rdd2.reduce((x,y)=>x+y)
res152: String = abcdefghij

In this example the collection of Char values has been transformed into a collection of String values, because Strings, but not Chars allow the application of the + method.

The mechanisme working behind the reduce method can be shown by the following recursive Scala function reducing the elements of a List-collection:

scala>  def red[A](li: List[A],op: (A,A)=> A): A = {
     |        val s = op(li(0),li(1))
     |        val lid = li.drop(2)
     |        if (lid.size == 0) return s
     |        red(s::lid,op)
     |       }
red: [A](li: List[A], op: (A, A) => A)A

The function red receives as first input in square brackets a type value, which is to be used for input and output vlaues. Following in round brackets are two input values: the list object to be reduced and the function defining the operation to be employed for the reduction. In the function body, the first two elements of the list are used by the function to produce an result. A new list is created in two steps: first dropping the first two elemants using the drop(2) method and then prepending the result s of the operation using the :: operator. Then the new list, which has one element less than the old one is used as input for calling recursively the function red. The recursion loop is broken, when the length of the list is two and the operation of the function on the remaining two element is output as the end result for the reduction.

This function now is used to reduce the List-collection of strings chars from the previous example:

scala> red[String](chars,(x,y)=>x+y)
res2: String = abcdefghij

and produces the expected result.

It will be interesting to follow the separate steps in the reduction process by including a print statement in the function literal:

scala> red[String](chars,(x,y)=>{println(x,y);x+y})
res4: String = abcdefghij

As expected, in each reduction step the next character is appended to the partial string result obtained so far.

Now the reduction steps will be examined for RDD-collections, built with different numbers of partitions. With one partition, the reduction of the RDD-collection results from the same succession of steps as have been seen in the reduction of the List-collection:

scala> val rdd_1p = sc.parallelize(chars,1)
rdd_1p: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:26

scala> red[String](chars,(x,y)=>{println(x,y);x+y})
res8: String = abcdefghij

With 2 partitions, a different pattern for the reduction is observed:

scala> val rdd_2p = sc.parallelize(chars,2)
rdd_2p: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:26

scala> rdd_2p.glom.collect
res9: Array[Array[String]] = Array(Array(a, b, c, d, e), Array(f, g, h, i, j))

scala> rdd_2p.reduce((x,y)=>{println(x,y);x+y})
res10: String = fghijabcde

The succession of reduction steps shows, that the characters in each partitions are concatenated separately, and only at the end the two partial strings from the two partitions are combined to the final complete string. The partial reduction in the partitions is done simultaneously, the resulting ordering in the final string therefore is nondeterministic, dependimg on the which partion is the first to finish its partial reduction. In order to obtain a unique result, the reduction operation must be commutative, a condition, wich is not fulfilled by the concatination of characters.

The parallel processing of the partial reductions in the different partitions can again be observed with three partitions:

scala> val rdd_3p = sc.parallelize(chars,3)
rdd_3p: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:26

scala> rdd_3p.glom.collect
res4: Array[Array[String]] = Array(Array(a, b, c), Array(d, e, f), Array(g, h, i, j))

scala> rdd_3p.reduce((x,y)=>{println(x,y);x+y})
res6: String = abcghijdef

Building and Submitting Spark Applications

The Spark Shell is appropriate for experimenting with Spark's features and for developing and testing small programs. Applications for production purposes will consist of program files, which have to be compiled and linked into an executable object, that can be submitted for execution on a cluster. The components involved in running a spark application are displayed schematically in the following picture:

The client process submits the application, including specifications for the execution environment and eventually parameters for the application.

A driver process is started, which handles the statements contained in the program file for the spark application. Every Spark application program has to specify a SparkContex, in which the details for executing the appliction - e.g. type of cluster, number of executors, cores and memory of executors - are defined. According to these requirements the driver initiates one or more executor processes with the required number of cores to be run on the cluster. Of course the cluster to be used must provide enough computing resoures for the requirements of the SparkContext.

The execution of the statements in the Spark application program then will be shared between driver and executors. Statements involving RDDs will be executed on the executors. An operation for a RDD is split into a number of tasks acccording to the number of partitions in the RDD, each task responsible for executing the operation on the data set in the respective partition. The driver process includes a scheduler, which sends these tasks to the executors for computing the required operations on the data of the partition on one of the executor cores. The degree of parallelisme for the Spark application therefore is determined by the total number of cores of all executors, because every core can work in parallel on a different partition of a RDD.

In addition to distribute the work to be done on the partitions of RDDs to the excutors, the driver collects and combines the results the executors produce for action operations on RDD partitions and processes all statements in the program, which do not involve RDDs.

The next sections will describe:

  1. defining the SparkContext in a scala program for a Spark application,
  2. producing an executable jar-file from this program using the sbt (simple built tool)
  3. running the application with spark-submit
  4. running the application on a Spark standalone cluster
  5. submitting Spark application job to the scheduling system SLURM for execution on GWDG's scientific compute cluster.

Setting the Spark Context

Establishing the connction to the SparkContext in a Scala programm for a Spark application is demontrated by the following code:

import org.apache.spark.SparkContext

object SparkConnect {
  def main(args: Array[String] = Array("")) {
    val sp = new SparkContext()

In the first line of the program the org.apache.spark.SparkContext package is imported to provide access to the SparkContext class.

Then the object SparkConnect is created, containing a single method main. In the main method the instance sp of the class SparkContext is created. This instance has access to the methods provided in the SparkContext class for generating and manipulating RDD collections. In this example, the instance of the SparkContext is generated without specifying values for its properties. The configuration of the context's properties will be taken from defaults defined for the execution environment to which the application is submitted.

The printstatement gives the details of the context's configuration, which in this example uses the default values from the environment.

The last statement deactivates the connection to the created SparkContext instance. A Spark application can be connected to only one single SparkContext. Trying to create a second connection will result in an error and terminate the application. Therefore the stop method has to be applied to an existing SparkContext instance, before a new one can be established.

If it is not known, wether a connection to a SparkContext allready exists, the new connection should be established with the getOrCreate method for the SparkContext class:

val sp = SparkContext.getOrCreate()

If an instance of SparkContext already exists, this will be used for sp, if not, sp will be a new instance of SparkContext.

From the file containing this program code an executabel jar file can be generated as will be explained in the next section. Invoking the executable with the spark-submit command will then call the main method defined for the object in the program file. Arguments for the main method and the settings for the configuration of the SparkContext can be passed with parameters and options, as will described in detail in a later section.

The program, stored in a file SparkConnect.scala can be tested in the Spark Shell by loading it with the :load command and then calling the main method for the object defined in the program file:

scala> :load SparkConnect.scala
Loading SparkConnect.scala...
import org.apache.spark.SparkContext
defined object SparkConnect

scala> SparkConnect.main()
org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). 

scala> sc.stop()

scala> SparkConnect.main() shell

As was explained in previous sections, invoking the Spark Shell automatically provides a connection to a SparkContext with the value sc. The creation of an additional instance of SparkConnect in the main method is not allowed and leads to an execption. Only after stopping the Spark shell instance sc the main method can create the new instance sp and print out the properties of its configuration. The new instance gets its properties from the Spark Shell environment. In particular the application name is set to Spark shell and the master is set to local[*].

The properties of the SparkContext instance can be defined locally in the main definition by creating an instance of the class SparkConf with prescibed values for properties and creating the SparkContext with the value of the SparkConf instance as argument :

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.{ Logger, Level}

object SparkConnect {
  def main(args: Array[String] = Array("")) {
    val co = new SparkConf(false).setMaster("local[2]").setAppName("SparkApp")
    val sp = new SparkContext(co)

The argument false for the SparConf class prevents the setting of default properties from the environment, setting this argument to true or setting no argument leads to taking over the environments properties. The setMaster and setAppName methods then set the chosen custom values for these properties. The setting of master and appname is obligatory for a valid SparkContext, without setting these two properties the creation of the SparkContext's instance will fail.

There is a large list of properties to be set, see the corresponding documentation for Spark Configuration. Only for a small set of properties a separate set-method is provided. The majority of properties has to be set with the method set(“property-name”,“property-value”).

A further addition in this example is the setting of the log level to WARN, which prevents the printing of a lot of INFO messages.

Building the Spark Application Executable with SBT

There exist many different tools for compiling Scala code and build an executable jar-file. One of them is sbt, which stands for “simple build tool” or “scala build tool”. The sbt tool is not simple at all, as a look into the sbt reference manual shows. But for the Spark applications treated in these sections only a very restricted set of sbt features will be needed and explained in the following.

In order to use the sbt tool on GWDG's cluster, the following module has to be loaded:

module load scala/sbt

The current working directory, from which sbt tasks are startet, is called the base directory in the sbt documentation. In the base directory at least one file with the suffix .sbt has to be provided, containing information concerning the tasks sbt shall execute. Usually the name build.sbt is chosen for this file, but sbt uses information from all files in the base directory with the suffix .sbt.

The following file build.sbt placed in the base directory will be the starting point for building a Spark application:

/* build.sbt */

name := "SparkApp"
scalaVersion := "2.11.8"

logLevel := Level.Error
libraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % "2.3.1")

The build file defines a name for the application, which will be a part of the name for the jar-file generated by sbt. Then the scalaVersion used for compiling the the program code is specified and finally the version 2.3.1 of the spark-core package is added to the library dependency, such that the import of Spark packages in the program code can be resolved. The Loglevel setting Level.Error prevents the printing of many WARN messages concerning version conflicts resulting from automatically included libraries.

The executable jar file for a Spark application is built with the package task.

sbt looks for source code files with suffix .scala in the base directory and in some default directories relative to the base directory, e.g. src/main/scala. The use of sbt will be demonstrated with a source file SparkConnect.scala, containing the code generating the SparkConnect from the previous section.

An interactive sbt shell can be invoked in the base directory with the command sbt

gwdu102:54 11:16:02 ~/SparkApp > sbt
[info] Updated file /home/uni05/ohaan/SparkApp/project/ set sbt.version to 1.1.6
[info] Loading project definition from /home/uni05/ohaan/SparkApp/project
[info] Loading settings from build.sbt ...
[info] Set current project to SparkApp (in build file:/home/uni05/ohaan/SparkApp/)
[info] sbt server started at local:///usr/users/ohaan/.sbt/1.0/server/bab34b8d3cad9a03bcdd/sock
sbt:SparkApp> show sources
[info] * /home/uni05/ohaan/SparkApp/SparkConnect.scala
[success] Total time: 0 s, completed Feb 20, 2019 11:17:43 AM

Invoking sbt for the first time generates a complex hierarchy of directories, which will only be described as far as relevant for the current purpose of generating a Spark application. The first [info] line states, that the newly generated directory projects has a file containing the used version of sbt, 1.1.6 in this example. The sbt-shell prompt sbt:SparkApp> takes the extension SparkApp from the name value set in build.sbt With the task show sources all files ending with .scala from the default source directories will be displayed.

The follwing list shows some sbt tasks, wich can be executed in the sbt-shell.

tasks description
tasks displays all tasks available in the sbt-shell
compile compiles the code from source files, generating class files
package produces binary jar files from the compiled sources ready for execution
run runs the main class from the executable jar file
clean deletes files produced by the sbt tasks, such as compiled classes and binary jar files
reload reloads the project in the base directory (after changes of settings in the .sbt file)
exit ends the sbt-shell

These tasks can also be executed as batch commands sbt <task> issued from the base directory. The sbt-shell offers in addition the possibility of continuous commands by prepending the task name by ~, as e.g.~compile. In the continous command mode the shell executes the tasks repeatedly after changes in files, on which the task depends.

The executable jar files for Spark applications are generated with the package task.

sbt:SparkApp> package
[success] Total time: 4 s, completed Feb 20, 2019 3:43:39 PM
sbt:SparkApp> show package
[info] /home/uni05/ohaan/SparkApp/target/scala-2.11/sparkapp_2.11-0.1.0-SNAPSHOT.jar
[success] Total time: 0 s, completed Feb 20, 2019 3:43:43 PM

The second command show package returns the location /home/uni05/ohaan/SparkApp/target/scala-2.11/ and name sparkapp_2.11-0.1.0-SNAPSHOT.jar of the generated jar file. This name includes the project property settings name, version and scalaVersion from the build.sbt file. Since no version value for the project was set in the example, the default value 0.1.0-SNAPSHOT was chosen.

The executable jar file can be submitted by the run task from sbt:

sbt:SparkApp> run
Using Spark's default log4j profile: org/apache/spark/
19/02/20 16:03:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[success] Total time: 3 s, completed Feb 20, 2019 4:03:27 PM

Running a Spark Application with spark-submit

The spark-submit command starts the client process for a Spark Application and allows to set at run time configuration properties for the SparkContext.

spark-submit \
  --master <master-url> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \   

Some properties can be set using a reserved option name, for example the --master option determines the compute environment to be used for the Spark application. Some possible values for the master-url are

Master URL Meaning
local Run Spark locally with one worker thread (i.e. no parallelism at all)
local[K] Run Spark locally with K worker threads
local[*] Run Spark locally with as many worker threads as logical cores on your machine
spark://HOST:PORT Connect to the given Spark standalone cluster master

The complete list of possible options with their default settings can be inspected by calling spark-submit --help

spark-submit may be invoked without any option, only the absolute or relative path to the jar file containing the executable spark Application must be provided. Then the configuration properties will be set to their default values.

A Spark Application creating a SparkContext without specifying any properties as in

import org.apache.spark.SparkContext

object SparkConnect {
  def main(args: Array[String] = Array("")) {
    val sp = new SparkContext()

with a corresponding application-jar file target/scala-2.11/sparkapp_2.11-0.1.0-SNAPSHOT.jar produced by the sbt package command as described in the previous section, will produce the following output by submitting it without explicit settings for the options

 ~/SparkApp > $SPARK_HOME/bin/spark-submit target/scala-2.11/sparkapp_2.11-0.1.0-SNAPSHOT.jar

Ways to Configure the SparkContext

The properties, which can be configured for the SparkContext are listed in the section Spark Configuration of the Spark documantation.

Here is a short collection of properties from this list

property namedefaultdescription
spark.master (none) the cluster manager to connect to
spark.cores.max (not set) the maximum amount of CPU cores to request for the application from across the cluster
spark.executor.cores all the available cores on the worker in standalone mode the number of cores to use on each executor
spark.default.parallelism for distributed shuffle operations the largest number of partitions in a parent RDD; for operations like parallelize with no parent RDDs total number of cores on all executor nodesdefault number of partitions in RDDs

There are three different ways to set a configuration property for a SparkContext, which are in order od precedence:

With the method set for the SparkConf object in the application program


with the option --conf in the spark-submit command

--conf <property_name>=<property_value>

with a line in a the spark configuration file

<property_name>    <property_value>

with the name spark-defaults.conf in the directory specified in the environmment variable SPARK_CONF_DIR or with a name specified in the option --properties-file in the spark-submit command

Some frequently used properties can be set with a simplified syntax. Examples are the method setMaster(“<master-url>”) for the SparkConfig class or the option --master <master-url> for the spark-submit command.

Setting up a Spark Standalone Cluster

Starting an application with spark-submit without specifying the spark.master property for the SparkContext or with the value local[*] for the option --master will use all the cores of the local node for executing the application. Multiple nodes for a distributed execution of a Spark application can be used on a Spark standalone cluster. The standalone mode consists of a master and one or more workers. The Spark installation provides in the directory $SPARK_HOME/sbin the following scripts for managing the standolone mode:

sript name purpose Starts a master instance on the machine the script is executed on Stops the master that was started via the script Starts a worker instance on the machine the script is executed on Stops the worker that was started via the script Starts worker instances on all nodes listed in the file slaves Stops the workers that were started via the script Combines starting master and workers Combines stopping master and workers
Starting the Master

The following lines show the steps for starting a master process for a Spark standalone cluster

gwdu102 > module load JAVA/jdk1.8.0_31 spark
gwdu102 > export SPARK_LOG_DIR=~/SparkLog

gwdu102 > ${SPARK_HOME}/sbin/
starting org.apache.spark.deploy.master.Master, logging to /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.master.Master-1-gwdu102.out

gwdu102 > grep Starting /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.master.Master-1-gwdu102.out
2019-07-22 15:45:52 INFO  Master:54 - Starting Spark master at spark://

gwdu102 > jps
31663 Jps
29761 Master

Before starting the master by executing $SPARK_HOME/sbin/, the environment for JAVA and Spark must be initialized by loading the appropriate modules. Furthermore the environment variable SPARK_LOG_DIR has to be set to the name of a log directory, which will be created if it does not yet exist. The invocation of produces a message, which includes the name of the log file in this directory.

Among many other messages printed in this log file is a line with the master-url spark://, which includes the name of the host, on which the master has been started and the number of the port, on which the master listens. If not specified with the option --port <port number> , the port number is 7077 by default. The workers in the Spark stanalone cluster will use this master-url for the connection to the master, and the spark-submit command will need the master-url to start the Spark application in the standolane mode.

With the command jps the running of the master process in a java virtual machine can be verified.

Starting a Worker

Now a worker process can be started as follows:

gwdu102 > export SPARK_WORKER_DIR=~/SparkWorker

gwdu102 > ${SPARK_HOME}/sbin/ spark://gwdu102:7077
starting org.apache.spark.deploy.worker.Worker, logging to /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.worker.Worker-1-gwdu102.out

gwdu102 > grep Starting /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.worker.Worker-1-gwdu102.out
2019-07-22 15:47:45 INFO  Worker:54 - Starting Spark worker with 16 cores, 61.9 GB RAM

gwdu102 > grep registered /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.worker.Worker-1-gwdu102.out
2019-07-22 15:47:45 INFO  Worker:54 - Successfully registered with master spark://

gwdu102 > grep Registering /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.master.Master-1-gwdu102.out
2019-07-22 15:47:45 INFO  Master:54 - Registering worker with 16 cores, 61.9 GB RAM

gwdu102 > jps
18861 Jps
3081 Worker
29761 Master

For starting a worker calling ${SPARK_HOME}/sbin/ with the master-url as an obligatory argument, the name of a worker-directory has to be supplied, into which output from running applications on the worker will be stored. This name can be provided by setting the environment variable SPARK_WORKER_DIR. Alternatively this name can be set in the option --work-dir following the master-url argument in the calling sequence for starting the worker. This directory will be created if it does not yet exist. .

The message from starting the worker shows, that a log file is produced in the directory indicated by the environment variable SPAR_LOG_DIR, which was set before starting the master. This logfile contains a line stating the start of the worker process , the number of cores and the amount of memory allocated to the worker. A further line announces the registering of this worker with the master. The same event is listed in the master log file as well.

As can be seen from the jps command, master and worker processes are now running on the same host.

By default the number of available cores and all available memory of a node is allocated to the worker startet with the script. Different amount of hardware ressources for the worker can be specified by setting the environment variables SPARK_WORKER_CORES to the number of required cores and SPARK_WORKER_MEMORY to the size of required memory (e.g. to 1000M or 2G). Instead of setting the environment variables, these ressources can be required by setting the options --core and --memory in the calling sequence for starting the worker.

Starting a Worker on a Remote Node

The standalone cluster can include workers running on different nodes by starting the script on different nodes.

gwdu102 > ssh gwdu103
Last login: Wed Jun 26 11:36:28 2019 from
gwdu103 > module load JAVA/jdk1.8.0_31 spark
gwdu103 > export SPARK_LOG_DIR=~/SparkLog

gwdu103 > ${SPARK_HOME}/sbin/ spark://gwdu102:7077 --cores 4 --work-dir ~/SparkWorker
starting org.apache.spark.deploy.worker.Worker, logging to /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.worker.Worker-1-gwdu103.out
gwdu103 > jps
20784 Worker
21588 Jps
gwdu103 > exit
Connection to gwdu103 closed.

gwdu102 > grep Registering /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.master.Master-1-gwdu102.out
2019-07-22 15:47:45 INFO  Master:54 - Registering worker with 16 cores, 61.9 GB RAM
2019-07-22 16:28:08 INFO  Master:54 - Registering worker with 4 cores, 61.8 GB RAM

Of course after login with ssh from node gwdu102 to the node gwdu103 the modules for using Spark have to be loaded and the variable SPARK_LOG_DIR has to be set before the start of a worker on this node is possible. In this example worker directory and number of worker cores are specified by setting values for the options --work-dir and --core.

All activities for setting up the standalone cluster are documented in the logfile ~/SparkLog/spark-ohaan-org.apache.spark.deploy.master.Master-1-gwdu102.out. The two lines displayed with the grep command show, that now two workers on two different nodes are available for the application.

Starting Multiple Workers with ''''

The script sbin/ will start multiple workers on different nodes using remote commands via ssh. The node names must be provided as separate lines in a file slaves. This file has to be created in the directory specified in the environment variable SPARK_CONF_DIR. The default value for this variable is $SPARK_HOME/conf. Because this default is a read only directory in the Spark installation on GWDG's compute cluster, SPARK_CONF_DIR has to be explicitely set to a name of a local directory with permission for creating files. The Spark standalone scripts inspect this directory also for the existence of a file, from which values for environment variables can be set, e.g. SPARK_LOG_DIR and SPARK_WORKER_DIR for the directories needed for the start of master and worker, and SPARK_WORKER_CORES and SPARK_WORKER_MEMORY for the amount of hardware ressources to be allocated the workers.

The environment created in the local shell from which the script sbin/ is startet will not be passed to the bash shell created on the remote nodes by the ssh commands. But in the remote shell the file ~/.bashrsc is sourced before the required commands are executed. Putting export statements for the necessary environment variables into the file ~/.bashrsc on the remote node therefore will provide the environment needed to execute the standalone scripts on the remote nodes. Since the file system in GWDG's cluster is global, the ~/.bashrsc, set up in the local shell, will be available on all nodes. The following screen shots show how to set up the standalone cluster with the sbin/ script:

Setting environment variables SPARK_HOME, JAVA_HOME by loading the modules for Spark and Java:

gwdu102 > module load JAVA spark
gwdu102 > echo $JAVA_HOME
gwdu102 > echo $SPARK_HOME

Setting the environment variable SPARK_CONF_DIR and creating the directory to which it points:

gwdu102 > export SPARK_CONF_DIR=~/SparkConf
gwdu102 >  mkdir -p $SPARK_CONF_DIR

Generating a file which sets the environment variables SPARK_LOG_DIR and SPARK_WORKER_DIR and, optionally additional varables, e.g. SPARK_WORKER_CORES and SPARK_WORKER_MEMORY:

gwdu102 >  echo "export SPARK_LOG_DIR=~/SparkLog" > $SPARK_CONF_DIR/
gwdu102 >  echo "export SPARK_WORKER_DIR=~/SparkWorker" >> $SPARK_CONF_DIR/
gwdu102 >  echo "export SPARK_WORKER_CORES=4" >> $SPARK_CONF_DIR/
gwdu102 >  echo "export SPARK_WORKER_MEMORY=4048M" >> $SPARK_CONF_DIR/

gwdu102 > cat $SPARK_CONF_DIR/
export SPARK_LOG_DIR=~/SparkLog
export SPARK_WORKER_DIR=~/SparkWorker

Generating a file slaves containing the names of the nodes on which a worker shall be started:

gwdu102 > echo gwdu102 > $SPARK_CONF_DIR/slaves
gwdu102 > echo gwdu101 >> $SPARK_CONF_DIR/slaves
gwdu102 > echo gwdu103 >> $SPARK_CONF_DIR/slaves
gwdu102 > cat $SPARK_CONF_DIR/slaves

Generating the file ~/.bashrc to set the environment for the shell started by the ssh commands on the remote nodes:

gwdu102 > echo "export SPARK_HOME=$SPARK_HOME" > ~/.bashrc
gwdu102 > echo "export JAVA_HOME=$JAVA_HOME" >>  ~/.bashrc
gwdu102 > echo "export SPARK_CONF_DIR=$SPARK_CONF_DIR" >>  ~/.bashrc       
gwdu102 > cat  ~/.bashrc
export SPARK_HOME=/usr/product/parallel/spark/2.3.1/spark-2.3.1-bin-hadoop2.7
export JAVA_HOME=/usr/product/bioinfo/JAVA/jdk1.8.0_31/jre
export SPARK_CONF_DIR=/usr/users/ohaan/SparkConf

Now the scripts for starting master and worker can be run:

gwdu102 > $SPARK_HOME/sbin/
starting org.apache.spark.deploy.master.Master, logging to /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.master.Master-1-gwdu102.out

gwdu102 > $SPARK_HOME/sbin/
gwdu103: starting org.apache.spark.deploy.worker.Worker, logging to /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.worker.Worker-1-gwdu103.out
gwdu102: starting org.apache.spark.deploy.worker.Worker, logging to /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.worker.Worker-1-gwdu102.out
gwdu101: starting org.apache.spark.deploy.worker.Worker, logging to /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.worker.Worker-1-gwdu101.out

The last lines of the logfile for the master report the successfull starts of the three workers:

gwdu102 >  more /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.master.Master-1-gwdu102.out
2019-07-23 15:44:19 INFO  Master:54 - Registering worker with 4 cores, 4.0 GB RAM
2019-07-23 15:44:21 INFO  Master:54 - Registering worker with 4 cores, 4.0 GB RAM
2019-07-23 15:44:21 INFO  Master:54 - Registering worker with 4 cores, 4.0 GB RAM

These lines also show, that the workers provide the number of cores and amount of local memory specified in the environment variables SPARK_WORKER_CORES and SPARK_WORKER_MEMORY via the .bashrc file.

The standalone cluster can be started in one step by running the command sbin/ which combines sbin/ and The workers are stopped by sbin/, the master ist stopped by sbin/stop-master, master and workers are stopped in one step by sbin/

Submitting to the Spark Standalone Cluster

The Spark standalone cluster provides cores and memory on different worker nodes. The Spark application uses the resources from the standalone cluster in a way specified by configuration properties for the SparkContext. These properties must include the master_url, furthermore number of cores and amount of memory for the executors can be prescribed and the default number of partitions in a RDD collection can be fixed. Different ways to set SparkContext properties programmatically or at execution time have been described in the section Ways to Configure the SparkContext. The following script sets the property spark.default.parallelism to the total number of cores of all worker nodes and choses for spark.executor.cores and spark.executor.memory fixed values by generating the corresponding configuration file $SPARK_CONF_DIR/spark-defaults.conf.

nodelist=`cat $SPARK_CONF_DIR/slaves`
for host in $nodelist
  cores=$(ssh $host '. $SPARK_CONF_DIR/;echo $SPARK_WORKER_CORES')
  total_cores=$(( $total_cores + $cores ))

echo "spark.default.parallelism" $total_cores > $SPARK_CONF_DIR/spark-defaults.conf
echo "spark.submit.deployMode" client >> $SPARK_CONF_DIR/spark-defaults.conf
echo "spark.master" spark://`hostname`:7077 >> $SPARK_CONF_DIR/spark-defaults.conf
echo "spark.executor.cores" 2 >> $SPARK_CONF_DIR/spark-defaults.conf
echo "spark.executor.memory" 1024M >> $SPARK_CONF_DIR/spark-defaults.conf

A toy application, showing the distribution of partitions onto executors, is generated by the following scala program:

/* SparkApp.scala */

import org.apache.spark._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.log4j.{ Logger, Level}
import sys.process._

object SparkApp {
  def work(i: Int): (String,String,String) = {
    val str1 = "date +%T".!!
    val time = str1.take(str1.length-1)
    val str2 = "hostname".!!
    val host = str2.take(str2.length-1)
    val id = SparkEnv.get.executorId
    "sleep 10".!
    return (host,id,time)

  def main(args: Array[String] = Array("")) {
    val sp = new SparkContext()
         val st = System.nanoTime
    val rdd = sp.parallelize(1 to sp.defaultParallelism).cache
    println("\nInformation from partitions:")
    println("host,executor_id,start_time")>work(x)).collect.foreach(x=>println(x+" "))
         val dt = (System.nanoTime - st)/1e9d
    println("\n total time for application: "+dt)


This program defines a function work, which simulates some work by calling sleep 10 and which returns three strings: the hostname on which the function was executed, the id of the executor executing the function and the time of starting the function execution.

The main program, after printing the configuration of the SparkContext, generates a RRD with a number of elements equal to the defaultParallelism value of the SparkContext, which is split into the same number of partitions, such that every partition of the RDD cantains just one element. Then the result from calling the map method to execute the work function for every element of the RDD is printed. From registering the system time before and after the execution of the RDD operations, the total execution time is calcluated.

As described in the section Building the Spark Application Executable with SBT an executable jar file ~/SparkApp/target/scala-2.11/sparkapp_2.11-0.1.0-SNAPSHOT.jar can be generated, which in turn can be executed with the spark-submit command.

For a Spark standalone cluster running three workers with two cores, and setting the number of cores for the executors to two, the following output results:

gwdu102 > spark-submit ~/SparkApp/target/scala-2.11/sparkapp_2.11-0.1.0-SNAPSHOT.jar
2019-07-29 17:43:42 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Information from partitions:

 total time for application: 14.401789086

The SparkContext configuration has the expected properties for the defaultParallelism (equal to 6, the total number of cores in the standalone cluster) and for the executor cores (equal to 2). The RDD collection was created with 6 elements, spread into 6 partition, so every partion contains just 1 element. Every partition of the RDD is send by the driver to an executor for performing the action on the partitions elements as defined in the function given as argument to the map method. The output shows, that three executors have been working (executor ids 0, 1, 2), each executor on a different host, and every executor acting on 2 partitions. Since the total core number for running the application is 6, the computation for every partition is scheduled to a different core and all 6 computations can start nearly simultaneously, delayed only by the time needed for the scheduling itself. Comparing the total time for the application of 14.4 sec to the time of 10 sec used in a single work function shows, that this overhead is aprox. 4 sec.

Running Spark Applications on GWDG's Scientific Compute Cluster

Jobs on GWDG's compute cluster are managed by the SLURM workload manager as described in the section Running Jobs with SLURM of GWDG's HPC documentation.

A Spark standalone cluster can be started on nodes of the compute cluster by submitting the SLURM sbatch command with a script specifying the required resources, followed by the commands for setting up the standalone cluster as described in the previous two sections. The resources to be allocated by the SLURM system should include as many slurm tasks as spark executors needed for the spark application, and each task should provide the number of cores and the amount of memory that an executor needs for the spark application. Therefore the sbatch script must include the options --ntasks, --cpus-per-task and --mem-per-cpu, whereas an option --nodes is not neeeded, because the SLURM manager can decide on the number of nodes for providing the resources for the required number of tasks.

An example is the following start-standalone.script :


#SBATCH --partition=medium
#SBATCH --ntasks=4
#SBATCH --cpus-per-task=2
#SBATCH --mem-per-cpu=2048
#SBATCH --time=60:00
#SBATCH --output=outfile-%J

sleep infinity

Submitting this script from the login host with the command sbatch start-standalone.script will start a Spark standalone cluster, which will be alive for the time required by the option --time=60:00, or until the command scancel <slurm_job_id> is launched, where <slurm_job_id> is the job-id returned from the sbatch command.

The commands in the batch script start with sourcing the script to prepare the environment for starting the standalone cluster:
module load JAVA/jdk1.8.0_31 spark

export SPARK_CONF_DIR=~/SparkConf
mkdir -p $SPARK_CONF_DIR

echo "export SPARK_LOG_DIR=~/SparkLog" > $env
echo "export SPARK_WORKER_DIR=~/SparkWorker" >> $env
echo "export SLURM_MEM_PER_CPU=$SLURM_MEM_PER_CPU" >> $env
echo 'export SPARK_WORKER_CORES=`nproc`' >> $env

echo "export SPARK_HOME=$SPARK_HOME" > ~/.bashrc
echo "export JAVA_HOME=$JAVA_HOME" >> ~/.bashrc
echo "export SPARK_CONF_DIR=$SPARK_CONF_DIR" >> ~/.bashrc

scontrol show hostname $SLURM_JOB_NODELIST > $SPARK_CONF_DIR/slaves

echo "spark.default.parallelism" $(( $SLURM_CPUS_PER_TASK * $SLURM_NTASKS ))> $conf
echo "spark.submit.deployMode" client >> $conf
echo "spark.master" spark://`hostname`:7077 >> $conf
echo "spark.executor.cores" $SLURM_CPUS_PER_TASK >> $conf
echo "spark.executor.memory" $(( $SLURM_CPUS_PER_TASK*$SLURM_MEM_PER_CPU ))M >> $conf

The role of the commands in this file have been explained in detail in the two previous sections. The number of cores for a Spark standalone worker is determined dynamically by the command `nproc`, which evaluates on each node to the number of cores allocated by the SLURM manager on the node. This number is equal to the number of tasks allocated to the node times the value specified in the SBATCH option --cpus-per-task. The amount of memory for a worker is determined by the product of this number of cores times the memory setting in the SBATCH option --mem-per-cpu, which is available as the environment variable SLURM_MEM_PER_CPU.

The nodes on which workers are to be started are listed in a special format in the environment variable SLURM_JOB_NODELIST, which is expanded to the node names with the scontrol command, the result of which is written to the slaves file.

In the last group of commands the properties for the SparkContext are set, which are needed for running the Spark application on the started standalone cluster. In particular, the spark.default.parallelism is set to the product of allocated SLURM tasks times the number of allocated cpus per tasks, i.e. to the total number of allocated cores. The property spark.executor.cores sets the numer of cores for every executor to the number of cpus allocated to every task in $SLURM_CPUS_PER_TASK, and spark.executor.memory sets the amount of memory for every executor to the amount of memory allocated the a SLURM task.

After preparing the environment for the standalone cluster, the next command $SPARK_HOME/sbin/ will start the cluster as described in the section Setting up a Spark Standalone Cluster.

The command returns after starting the master und worker processes. The standalone cluster will not be ready for executing the Spark application, until a connection between master and worker has been established and the workers processes have been registerd with the master process. Sourcing the script will wait until the registering of all workers has succeeded. This script looks repeatedly into the logfiles for the started workers and returns, if the line with the text Successfully registered with master has been included in the logfiles for all started workers:
num_workers=`cat $SPARK_CONF_DIR/slaves|wc -l`
echo number of workers to be registered: $num_workers
master_logfile=`ls -tr ${SPARK_LOG_DIR}/*master* |tail -1`
worker_logfiles=`ls -tr ${SPARK_LOG_DIR}/*worker* |tail -$num_workers`
for i in {1..100}
  sleep $steptime
  num_reg=` grep 'registered' $worker_logfiles|wc -l`
  if [ $num_reg -eq $num_workers ]
echo registered workers after $((i * steptime)) seconds  :
for file in $worker_logfiles
  grep 'registered' $file
grep 'Starting Spark master' $master_logfile
grep 'Registering worker' $master_logfile|tail -$num_workers

The output of the script will be written to the output file specified by the --output option in the sbatch script start-standalone.script. Only after finding lines like

number of workers to be registered: 3
registered workers after 114 seconds :
19/08/11 12:26:25 INFO Worker: Successfully registered with master spark://
19/08/11 12:27:07 INFO Worker: Successfully registered with master spark://
19/08/11 12:26:29 INFO Worker: Successfully registered with master spark://
19/08/11 12:26:24 INFO Master: Starting Spark master at spark://
19/08/11 12:26:25 INFO Master: Registering worker with 4 cores, 8.0 GB RAM
19/08/11 12:26:29 INFO Master: Registering worker with 2 cores, 4.0 GB RAM
19/08/11 12:27:07 INFO Master: Registering worker with 2 cores, 4.0 GB RAM 

in the batch output file, the Spark cluster is ready for executing Spark applications.

Now Spark applications can be submitted from the same login node, from which the sbatch script had been submitted. All necessary setting of the SparkContext properties have been placed into the spark-defauls.conf file by sourcing the file in the sbatch script. On the login node only the modules JAVA and spark have to be loaded and the variable SPARK_CONF_DIR has to be set to the directory into which the spark-defauls.conf file has been stored. The following screen shot shows the execution of a Spark application started from the login node.

gwdu102 > module load JAVA spark
gwdu102 > SPARK_CONF_DIR=~/SparkConf
gwdu102 > spark-submit  ~/SparkApp/target/scala-2.11/sparkapp_2.11-0.1.0-SNAPSHOT.jar
19/08/11 12:41:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/

Information from partitions:

 total time for application: 13.789137319

For executig the Spark application as a batch job, the batch sript has to be modified by replacing the command sleep infinity by

spark-submit ~/SparkApp/target/scala-2.11/sparkapp_2.11-0.1.0-SNAPSHOT.jar


Scientific Computing

wiki/hpc/parallel_processing_with_spark_on_gwdg_s_scientific_compute_cluster.txt · Last modified: 2019/08/11 15:34 by ohaan