Table of Contents
Parallel Processing with Spark on GWDG’s Scientific Compute Cluster
Apache Spark
Apache Spark is a cluster computing framework for processing large data sets, based on Scala, an object oriented functional programming language. Parallel processing in Spark uses the Resilient Distributed Database, RDD. Within an RDD, a given data set is decomposed into a number of partitions, which then are processed in parallel on the available resources in the cluster. The attribute resilient refers to the ability to recover in case of an error occurring for an individual partition and to reprocess this work on a different resource in the cluster.
The following sections will describe the Spark environment on GWDG’s compute cluster and the use of Spark’s RDD’s for parallel processing. The Scala programming language will be explained only so far as needed for the examples. Programming guides covering all of Scala are e.g. the online books Leanring Scala by Jason Swartz and Programming in Scala by Martin Odersky, Lex Spoon and Bill Venners.
The Interactive Spark Environment
GWDG provides version 2.3.1 of the Apache Spark framework. On all frontend nodes for the cluster (gwdu101, gwdu102, gwdu103) the Spark environment is loaded by the module command:
module load JAVA/jdk1.8.0_31 spark
(Because spark depends on JAVA 8, the module JAVA/jdk1.8.0_31
has to be loaded before the spark module.)
Spark provides an interactive Spark Shell for executing single Spark commands or scripts with collections of Spark commands. This shell can be loaded locally on a frontend node by executing the command
spark-shell
which produces after a number of comment lines the prompt
scala>
Spark expressions entered in this shell will be evaluated by using as many cores of the node in parallel as needed and the result will be returned on the screen.
Introducing the Spark Shell
The interactive Spark Shell is an extension of the **Scala REPL** (Read-Execute-Print-Loop) shell. Any input to the shell completed by the return key will be executed and will be followed by a printout of the expressions result, if the input was a valid Spark expression, or else by an error message.
The available commands in this shell are listed by executing :help
scala> :help All commands can be abbreviated, e.g., :he instead of :help. :edit <id>|<line> edit historyreset :help [command] print this summary or command-specific help :history [num] show the history (optional num is commands to show) :h? <string> search the history :imports [name name ...] show import history, identifying sources of names :implicits [-v] show the implicits in scope :javap <path|class> disassemble a file or class name :line <id>|<line> place line(s) at the end of history :load <path> interpret lines in a file :paste [-raw] [path] enter paste mode or paste a file :power enable power user mode :quit exit the interpreter :replay [options] reset the repl and replay all previous commands :require <path> add a jar to the classpath :reset [options] reset the repl to its initial state, forgetting all session entries :save <path> save replayable session to a file :sh <command line> run a shell command (result is implicitly => List[String]) :settings <options> update compiler options, if possible; see reset :silent disable/enable automatic printing of results :type [-v] <expr> display the type of an expression without evaluating it :kind [-v] <expr> display the kind of expression's type :warnings show the suppressed warnings from the most recent line which had any
To close the Spark shell, you press Ctrl+D
or type in :q
(or any subset of :quit
) Other commands will be desribed in turn when they will be used in the context of specific examples.
Apache Spark is based on the object oriented functional programming language Scala. Basic elements of the language are classes
, which contain fields
(values of specific types), and methods
(functions applied to the values in the fields) and possibly input parameters. The syntax is
class <identifier> ([val|var] <identifier>: <type>[, ... ]) [{ fields and methods }]
The following simple example shows how to define a class in the Spark Shell. The second line is the message from Spark Shell indicating the succesfull definition of a class:
scala> class User (n: String = “none”) { val name = n; def greet = s"Hello from $name" } defined class User
The class User
is defined with a field name
and a method greet
, and the field name
has the default value “none”
.
Instances of a class can be generated by expressions using the keyword new
. Appending the identifiers with .name
gives the value of its field name
, with .greet
results in applying its method greet
:
scala> val userdefault = new User userdefault: User = User@22495f2f scala> userdefault.name res0: String = none scala> userdefault.greet res1: String = Hello from none scala> val userhaan = new User("haan") userhaan: User = User@2ded3code1cc scala> userhaan.name res2: String = haan scala> userhaan.greet res3: String = Hello from haan
Every value defined by executing an expression in the Spark Shell is stored and available for later use, as are the results res[x] returned from executing some expression. The shell can be cleared by executing the command : reset
, which removes all previous entries in the shell.
The Spark Shell can execute commands from a script file using the command :load <scriptfilename>
. As an example, create a file User.script
in the directory, from which the Spark Shell was opened:
class User( n: String = "none") { val name = n def greet = s"Hello from $name" } val userdefault = new User userdefault.name userdefault.greet val userhaan = new User("haan") userhaan.name userhaan.greet
Loading this script in the Spark Shell gives the following result:
scala> :load User.script Loading User.script... defined class User userdefault: User = User@60a6564c res0: String = none res1: String = Hello from none userhaan: User = User@41cfcbb5 res2: String = haan res3: String = Hello from haan
Executing System Commands from the Spark Shell
The Spark Shell command :sh
allows to execute system commands and to display their results. E.g. the content of the directory from which the Spark Shell was started can be examined in the following way
scala> :sh ls -l res0: scala.tools.nsc.interpreter.ProcessResult = `ls -l` (29 lines, exit 0) scala> res0.lines foreach println total 2709248 -rw------- 1 ohaan GGST 727 Jan 28 12:07 derby.log drwx------ 2 ohaan GGST 0 Aug 31 17:06 files ... scala>
Unfortunately, wild cards or redirections are not handled by :sh
, a command like :sh ls core.*
will result in an error.
An alternative posibility for executing system commands is the use of Scala's !
or !!
methods (see e.g. this link). These are available after importing the sys.process._
package of Scala. The !
methods executes the content of a string as a command, displays the result of the system command as output in the Spark Shell and returns the return value of the system command. The !!
method returns the output from the system command in a string. As an example the execution of the ls -l
command is shown:
scala> import sys.process._ import sys.process._ scala> val content = "ls -l".! total 864 -rw------- 1 ohaan GGST 727 Jan 28 12:07 derby.log drwx------ 2 ohaan GGST 0 Aug 31 17:06 files ... content: Int = 0 scala> println(content) 0 scala> val content_string = "ls -l".!! content_string: String = "total 864 -rw------- 1 ohaan GGST 727 Jan 28 12:07 derby.log drwx------ 2 ohaan GGST 0 Aug 31 17:06 files ... scala> println(content_string) total 864 -rw------- 1 ohaan GGST 727 Jan 28 12:07 derby.log drwx------ 2 ohaan GGST 0 Aug 31 17:06 files ...
For system shell commands using wildcards, the input string has to be evaluated by a system shell, which can be realized by applying the !
and !!
methods to the string list Seq(“/bin/sh”, “-c”, “command-string”)
(see Scalacookbook, Recipe 12.17) :
scala> Seq("/bin/sh","-c","ls *.script").! map1.script map.script nums.script pi.script st.script User.script while.script res68: Int = 0
Creating Spark RDDs
The main entry point for Spark functionality is the class SparkContext
. (See documentation for Apache Spark 2.3.1 following API Docs->Scala->org.apache.spark->SparkContext). This class represents the connection to a Spark cluster and it provides the methods to create RDDs, to process data within the partitions of a RDD and to communicate data between the different partitions of a RDD.
Invoking the Spark Shell automatically generates the instance sc
of the SparkContext class, which is used to access all of Sparks functionalities from the Spark Shell.
There are three ways to create RDDs:
- copying existing Scala collections
- reading data from external storage
- creating new RDDs from existing RDDs
Copying Collections
An existing Scala collection is transformed into an RDD with the method parallelize
. This method needs as the first argument the collection to be transformed into a RDD. As an optional second argument the number of partitions of the RDD to be created can be prescribed. If no second argument is provided, a system dependend choice for this number will be made. The following example in the Spark Shell shows how to generate a RDD with 3 partitioins containing the integers 1 to 10 :
scala> val nums = 1 to 10 nums: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) scala> val nums_RDD = sc.parallelize(nums,3) nums_RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26
The first command creates the Scala collection nums
of type scala.collection.immutable.Range.Inclusive
, the second command uses the method parallelize
for Spark Shell's instantiation sc
of the class SparkContext to convert this collection into the RRD nums_RRD
of type org.apache.spark.rdd.RDD[Int]
.
Since the expression 1 to 10
in Scala is a class instance, it can be inserted directly into the expression for creating the RDD:
scala> val nums_RDD = sc.parallelize(1 to 10,3) nums_RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
Reading Data
SparkSession provides methods to read data in several formats from storage into RDDs. The method textFile
will read an asci text as a collection of lines into a RDD. For example the file lines.txt
line1 : Hello line2 : World line3 : to line4 : everyone
in the directory, from where the Spark Shell was started, will be loaded as follows:
scala> val lines_RDD = sc.textFile("lines.txt") lines_RDD: org.apache.spark.rdd.RDD[String] = lines.txt MapPartitionsRDD[18] at textFile at <console>:24
The argument in the method textFile
can be the path to a file, as in the previous example, or a path to a directory. In this case, the created RDD will have as many elements as the total number of lines of all files in he directory, each element containing one line.
Another method to create a RDD by reading text files is wholeTextFile
, with a directory path as argument. This method creates a RDD with as many elements as files in the directory, each element containing the full pathname for a file, and the string representation of the whole content of the file.
Modifying Existing RDDs
The RDD class allows the application of various methods, which result in the creation of new instances of RDD. Some of them will be described in more detail later on. An example is the method map(s ⇒ s.length)
, which replaces each string in the RDD by its length:
scala> val linelength = lines_RDD.map(s => s.length) linelength: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at <console>:25
Whereas lines_RDD
is a RDD-collection of Strings - the lines of the input file lines.txt
-, the newly created linelength
is a RDD-collection of Ints - the number of characters in each line of the input file -.
Working with RDDs
Once a RDD is created, the methods defined for this class can be used to do work on its content in order to produce the required results. RDDs support two types of operations: transformations, which create one or more new RDDs from existing ones, and actions, which return values to the driver program after running a computation on the input RDD.
All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently.
Transformations are called narrow if they don't mix data from different partitions of the input RDD, otherwise they are called wide.
Because the partitions of a RDD can be distributed over a cluster of computing resources, wide transformations and actions may involve data communication over the clusters network, whereas narrow transformations can be executed locally.
A list of all methods available for RDDs is displayed in the Spark Shell by completing the identifyer for the RDD followed by a full-stop with the tab-key:
scala> nums_RDD. [tab-key] ++ countByValue groupBy name sampleStdev toDF aggregate countByValueApprox histogram partitioner sampleVariance toDS cache dependencies id partitions saveAsObjectFile toDebugString canEqual distinct intersection persist saveAsTextFile toJavaRDD cartesian filter isCheckpointed pipe setName toLocalIterator checkpoint first isEmpty popStdev sortBy toString coalesce flatMap iterator popVariance sparkContext top collect fold keyBy preferredLocations stats treeAggregate collectAsync foreach localCheckpoint productArity stdev treeReduce compute foreachAsync map productElement subtract union context foreachPartition mapPartitions productIterator sum unpersist copy foreachPartitionAsync mapPartitionsWithIndex productPrefix sumApprox variance count getCheckpointFile max randomSplit take zip countApprox getNumPartitions mean reduce takeAsync zipPartitions countApproxDistinct getStorageLevel meanApprox repartition takeOrdered zipWithIndex countAsync glom min sample takeSample zipWithUniqueId
A description for the methods can be found in the documentation for the Spark RDD class
Displaying Properties of the RDD
Methods for displaying the size and content of the RDDs :
method | description |
---|---|
count() | returns the number of elements |
first() | returns the first element |
take(num) | returns an array with the first num elements |
takeSample(withReplacement, num, [seed]) | returns an array with a random sample of size num taken from the RDD, with or without replacement, optionally pre-specifying a random number generator seed. |
collect() | returns an array with all elements |
Applying the method count
to the previously defined RDDs returns their numbers of elements:
scala> val numscount = nums_RDD.count() numscount: Long = 10 scala> val linescount = lines_RDD.count() linescount: Long = 4
The count
method for RDDs is similar to the size
method for Scala collections and different from the count
method for Scala collections. The method size
is not available for RDDs:
scala> val numscount = nums_RDD.size() <console>:25: error: value size is not a member of org.apache.spark.rdd.RDD[Int] val numscount = nums_RDD.size() ^
Selections of the content of a RDD can be accessed by the first
, take
, or takeSample
methods:
scala> val content = lines_RDD.first() content: String = line1 : Hello scala> val content = nums_RDD.take(5) content: Array[Int] = Array(1, 2, 3, 4, 5)
The takeSample
method randomly selects a number of elements of the RDD. The sampling is done with or without replacement of the chosen elements depending on the value of the first argument in the method beeing true
or false
. With true
the size of the sample can be arbitrary, even larger than the count
of the RDD, and elements of the RDD can appear repeatedly, with false
the size of the sample is limited to the count of the RDD and each element of the RDD can appear at most once.
scala> nums_RDD.takeSample(false,4) res30: Array[Int] = Array(5, 10, 6, 8) scala> nums_RDD.takeSample(false,4) res31: Array[Int] = Array(8, 3, 2, 6) scala> nums_RDD.takeSample(true,14) res32: Array[Int] = Array(4, 6, 2, 8, 5, 5, 3, 3, 1, 1, 8, 9, 1, 6) scala> nums_RDD.takeSample(true,14) res33: Array[Int] = Array(9, 8, 3, 9, 5, 1, 10, 9, 10, 7, 2, 1, 3, 5)
The collect
method returns the full content ot the RDD, it gives the same result as calling the take
method with argument RDD_identifier.count.toInt
.
scala> val content = lines_RDD.collect() content: Array[String] = Array(line1 : Hello, line2 : World, line3 : to, line4 : everyone) scala> val content = nums_RDD.collect() content: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) scala> val content = nums_RDD.take(nums_RDD.count.toInt) content: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
The collect
method tries to transfers the content of the whole RDD, which can be distributed over the main memories of several compute nodes to the main memory of the master node. For large RDDs this could lead to an out-of-memory exeption on the master node, therefore the collect
method should not be used for large distributed RDDs.
Displaying Properties of RDD Partitions
Methods for displaying the number, sizes and contents of the partitions of the RDD are:
method | description |
---|---|
getNumPartitions | returns the number of partitions |
mapPartitions(f) | returns a new RDD by applying a function f: Iterator ⇒ Iterator to each partition |
glom() | returns a new RDD created by coalescing all elements within each partition into an array |
An example for getting the number of partitions is
scala> val nums_RDD = sc.parallelize(1 to 50) nums_RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at parallelize at <console>:24 scala> nums_RDD.collect res5: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50) scala> nums_RDD.getNumPartitions res0: Int = 24
which shows that the method parallelize
, invoked without the second parameter for prescibing the number of partitions created a RDD with 50 elements in 24 partitions. 24 is the number of cores avaible on the frontend gwdu103, which was used for these examples running Spark locally . It is the default for parallelize
to split a collection into as many partitions as cores are available.
The next example shows the result if the second parameter is set:
scala> sc.parallelize(1 to 50,3).getNumPartitions res22: Int = 3
The sizes of the individual partitions can be obtained using the mapPartitions
method, which will be described in more detail in the next section. This method allows to access the elements of every partition separately by an iterator. A function using these iterators to perform operations on the elements of the partitions and storing the result into an output iterator must be given as argument for this method. Using the sizes of the input iterators as output iterators then produces a RDD with 24 elements, each element containing the number of elements of the 24 partitions of the original RDD. Applying the same method to the new RDD produces a second RDD with 24 Elements containing the the value 1.
scala> val part = nums_RDD.mapPartitions((x: Iterator[Int]) => Iterator(x.size)) part: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[29] at mapPartitions at <console>:25 scala> part.collect res6: Array[Int] = Array(2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3) scala> val partpart = part.mapPartitions(x => Iterator(x.size)) partpart: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[30] at mapPartitions at <console>:25 scala> partpart.collect res7: Array[Int] = Array(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1)
The type assignment for the input iterator is redundant, because its type can be inferred from the type of elements in the RRD. This has been used in the expression for the value partpart
.
The glom
method allows to display the content of a RDD grouped by partitions. glom
creates a RDD with as many partitions as partitions in the original RDD, each partition containing one element, the array of elements from the corresponding partition of the original RDD. This is shown applying glom
to the RDD from the previuos example:
scala> nums_RDD.glom() res2: org.apache.spark.rdd.RDD[Array[Int]] = MapPartitionsRDD[52] at glom at <console>:26 scala> nums_RDD.glom().getNumPartitions res3: Int = 24 scala> nums_RDD.glom().collect() res4: Array[Array[Int]] = Array(Array(1, 2), Array(3, 4), Array(5, 6), Array(7, 8), Array(9, 10), Array(11, 12), Array(13, 14), Array(15, 16), Array(17, 18), Array(19, 20), Array(21, 22), Array(23, 24, 25), Array(26, 27), Array(28, 29), Array(30, 31), Array(32, 33), Array(34, 35), Array(36, 37), Array(38, 39), Array(40, 41), Array(42, 43), Array(44, 45), Array(46, 47), Array(48, 49, 50)) scala> sc.parallelize(1 to 50,3).glom.getNumPartitions res27: Int = 3 scala> sc.parallelize(1 to 50,3).glom.collect res28: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16), Array(17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33), Array(34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50))
Saving RDDs to Disk
method | description |
---|---|
saveAsTextFile(path) | save the RDD as a text file, using string representations of elements. |
This methods accepts a pathname to a directory, into which it saves the RDDs partitions in separate files, named part-00000
, part-00001
, etc. . This directory must not yet exist, it will be created by the method. The path may be absolut or relative to the working directory, from which the method is invoked.
As an example, the directory files
has three elements:
gwdu103:35 17:55:00 ~/Spark_Intro > ls files file1 file2 file3
Using textFile
creates a RDD, whose properties can be examined:
val textfile_RDD = sc.textFile("files") textfile_RDD: org.apache.spark.rdd.RDD[String] = files MapPartitionsRDD[102] at textFile at <console>:24 scala> textfile_RDD.count res67: Long = 12 scala> textfile_RDD.getNumPartitions res68: Int = 3 scala> textfile_RDD.glom().collect res69: Array[Array[String]] = Array(Array(b1, b2, b3, b4, b5), Array(a1, a2, a3, a4), Array(c1, c2, c3))
textfile_RDD.saveAsTextFile(“newdir”)
creates the directory newdir
with three files:
gwdu103:35 18:07:29 ~/Spark_Intro > ls newdir part-00000 part-00001 part-00002 _SUCCESS gwdu103:35 18:08:28 ~/Spark_Intro > more newdir* *** newdir: directory *** gwdu103:35 18:08:57 ~/Spark_Intro > more newdir/* :::::::::::::: newdir/part-00000 :::::::::::::: b1 b2 b3 b4 b5 :::::::::::::: newdir/part-00001 :::::::::::::: a1 a2 a3 a4 :::::::::::::: newdir/part-00002 :::::::::::::: c1 c2 c3 :::::::::::::: newdir/_SUCCESS ::::::::::::::
Using instead the method wholeTextFiles
:
scala> val wholetextfiles_RDD = sc.wholeTextFiles("files") wholetextfiles_RDD: org.apache.spark.rdd.RDD[(String, String)] = files MapPartitionsRDD[111] at wholeTextFiles at <console>:24 scala> wholetextfiles_RDD.count res77: Long = 3 scala> wholetextfiles_RDD.getNumPartitions res78: Int = 2 scala> wholetextfiles_RDD.glom().collect res79: Array[Array[(String, String)]] = Array(Array((file:/home/uni05/ohaan/Spark_Intro/files/file2,"b1 b2 b3 b4 b5 "), (file:/home/uni05/ohaan/Spark_Intro/files/file1,"a1 a2 a3 a4 ")), Array((file:/home/uni05/ohaan/Spark_Intro/files/file3,"c1 c2 c3 ")))
Saving wholetextfiles_RDD
with wholetextfiles_RDD.saveAsTextFile(“newdir1”)
produces the directory newdir1
:
gwdu103:35 18:09:11 ~/Spark_Intro > ls newdir1 part-00000 part-00001 _SUCCESS gwdu103:35 18:36:56 ~/Spark_Intro > more newdir1/* :::::::::::::: newdir1/part-00000 :::::::::::::: (file:/home/uni05/ohaan/Spark_Intro/files/file2,b1 b2 b3 b4 b5 ) (file:/home/uni05/ohaan/Spark_Intro/files/file1,a1 a2 a3 a4 ) :::::::::::::: newdir1/part-00001 :::::::::::::: (file:/home/uni05/ohaan/Spark_Intro/files/file3,c1 c2 c3 ) :::::::::::::: newdir1/_SUCCESS ::::::::::::::
More Transformations for RDDs
method | description |
---|---|
map(f) | returns a new RDD by applying a function f to each element |
flatMap(f) | returns a new RDD by first applying a function f to all elements, and then flattening the results |
mapPartitions(f) | returns a new RDD by applying a function f: Iterator ⇒ Iterator to each partition |
mapPartitionsWithIndex(f) | returns a new RDD by applying a function (Int, Iterator) ⇒ Iterator to each partition, while tracking the index of the original partition |
filter(f) | returns a new RDD containing only the elements that satisfy a predicate f: (T) ⇒ Boolean |
map
The map
method applies a function to every element of a RDD and generates a new RDD containing the results of the function evaluation. The function must have a single input and the new RDD will have the same number of elements as the old one. In the following example, a collection of integers from 1 to 10 is distributed into a RDD collection with 3 partitions. A function double
is defined which doubles its input and applied to the RDD with the map method:
scala> val ints = sc.parallelize(1 to 10,3) ints: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at <console>:27 scala> def double(x: Int): Int = 2*x double: (x: Int)Int scala> val ints_2 = ints.map(double) ints_2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[27] at map at <console>:30 scala> ints.collect res32: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) scala> ints_2.collect res33: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20) scala> ints.getNumPartitions res34: Int = 3 scala> ints_2.getNumPartitions res35: Int = 3
The output in the Spark Shell displays the expected result of applying the map
method with the double
function and the fact, that the number of partitions is equal in the old and new RDD.
The argument in the map method can be an identifier of a previously defined function as in the previous example. Also a function literal (anonymous function) can be used:
scala> val ints_2=ints.map((x: Int) => 2*x) ints_2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:25 scala> ints_2.collect res1: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)
The placeholder syntax of scala can be used for specifying the function literal:
scala> val ints_2=ints.map(_*2)
flatMap
If the function, that transforms the elements of the RDD, returns a collection of several elements, the result of map
will be a RDD containing as many collections as the number of elements in the original RDD:
scala> ints.map(x => List(2*x-1,2*x)).collect res45: Array[List[Int]] = Array(List(1, 2), List(3, 4), List(5, 6), List(7, 8), List(9, 10), List(11, 12), List(13, 14), List(15, 16), List(17, 18), List(19, 20))
The flatMap
method will “flatten” the RDD collection of generated collections into a RDD collection containing all elements of all generated collections:
scala> ints.flatMap(x => List(2*x-1,2*x)).collect res46: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
Applying the glom
method to the transformed RDD shows that the RDD generated with flatMap
has the same number of partitions as the original RDD, each partition containing all elements generated from the elements of the corresponding partition in the original RDD.
scala> ints.flatMap(x => List(2*x-1,2*x)).glom.collect res47: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6), Array(7, 8, 9, 10, 11, 12), Array(13, 14, 15, 16, 17, 18, 19, 20))
mapPartitions, mapPartitionsWithIndex
The mapPartition
method applies a function and returns the result separately to every partition of a RDD. The elements of the partition are fed into the function as an iterator and the result of the funtion has to be provided as an iterator, i.e. the function has to be defined as fun(x: Iterator[Type]): Iterator[Type] = . . .
. As an example here is an application of the mapPartition
method, which returns a RDD containing the number of elements in the original RDD's partitions:
scala> def ff(it: Iterator[Int]): Iterator[Int] = List(it.size).iterator ff: (it: Iterator[Int])Iterator[Int] scala> val ints = sc.parallelize(1 to 10,3) ints: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[63] at parallelize at <console>:24 scala> ints.glom.collect res127: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10)) scala> ints.mapPartitions(ff).collect res126: Array[Int] = Array(3, 3, 4)
The same result can be acchieved by giving the funtion argument for the mapPartitions method as a function literal
scala> ints.mapPartitions(it => List(it.size).iterator).collect res129: Array[Int] = Array(3, 3, 4)
With the method mapPartitionsWithIndex
, the transformation of the elements in a partition can be made dependend on the partition number, running from zero to number of partitions minus one. The function defining the transformation therefore has two input parameters, the partition index and the partition iterator. A function to be used as parameter in the mapPartitionsWithIndex method therefore has to be defined as fun(index: Int, x: Iterator[Type]): Iterator[Type] = . . .
.
In the following example, to every element of a partition the value 100*(index+1) is added:
scala> val ints = sc.parallelize(1 to 10,3) ints: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24 scala> ints.collect res5: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) scala> def fun(index: Int, it: Iterator[Int]): Iterator[Int] = it.toList.map(x =>x + 100*(index+1)).iterator fun: (index: Int, it: Iterator[Int])Iterator[Int] scala> ints.mapPartitionsWithIndex(fun).collect res6: Array[Int] = Array(101, 102, 103, 204, 205, 206, 307, 308, 309, 310) scala> ints.mapPartitionsWithIndex((index,it) => it.toList.map(x =>x + 100*(index+1)).iterator).collect res7: Array[Int] = Array(101, 102, 103, 204, 205, 206, 307, 308, 309, 310)
In the example the function defining the transformatioon is first provided by its identifier and then as a function literal.
filter
With the filter
method a new RDD is created containing all elements of the original RDD fulfilling a condition, which is specified in the function given as argument to the method. The function value must be of Boolean
type, and all elements, for which the function returns true
, are collected in the new RDD. The new RDD has the same number of partitions as the original one, and the retained values in the new RDD are placed in the partition with the same number as the partition they had occupied in the original RDD. Here are two examples for the filter
method, one selecting all even values, the other selecting all values smaller than 5:
scala> val ints = sc.parallelize(1 to 10,3) ints: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:24 scala> ints.glom.collect res16: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10)) scala> def fun(a: Int): Boolean = (a%2 == 0) fun: (a: Int)Boolean scala> ints.filter(fun).glom.collect res17: Array[Array[Int]] = Array(Array(2), Array(4, 6), Array(8, 10)) scala> ints.filter(a => (a < 5)).glom.collect res18: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4), Array())
Some Actions on RDDs
method | description |
---|---|
reduce(func) | Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel. |
collect() | Return all the elements of the dataset as an array at the driver program. |
count() | Return the number of elements in the dataset. |
first() | Return the first element of the dataset (similar to take(1)). |
take(n) | Return an array with the first n elements of the dataset. |
reduce
The reduce
method produces a single result from all elements of a RDD. The result depends on the operation, which is defined in the function passed as paramter for the reduce method. For example, the function literal (x,y)⇒ x+y
will produce the sum of all elements in the RDD, the function literal (x,y)⇒ x max y
the largest element in the RDD. Of course all elements in the RDD should have the same type, and the method +
resp. max
should be applicable to this type. Here are examples using the reduce
method for a collection of Int
values:
scala> val rdd1 = sc.parallelize(1 to 10,2) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[61] at parallelize at <console>:24 scala> rdd1.glom.collect res146: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5), Array(6, 7, 8, 9, 10)) scala> rdd1.reduce((x,y)=>x+y) res147: Int = 55 scala> rdd1.reduce((x,y)=>x.max(y)) res148: Int = 10 scala> rdd1.reduce((x,y)=>x max y) res149: Int = 10
The last two commands express the same operation: The first shows, that max
is a method applicable to objects of type Int
, the second is an example of Scalas syntax rule for methods with only one input. In the same way the addition of two values 2
and 3
is realized by the method +
applicable to objects od type Int
and therefore the result of the additions is returned form the statement 2.+(3)
. The above mentioned syntax rule allows to write this expression in the familiar form 2 + 3
.
The next example shows the action of the reduce
method with the +
operation for a collection of string values. Of course the +
operator for string valued data is not communicative as is required for a valid operation for the reduce method. But this example is useful later on for examining the reduction process for RDDs with multiple partitions.
scala> val chars = ('a' to 'j').toList.map(x => x.toString) chars: List[String] = List(a, b, c, d, e, f, g, h, i, j) scala> val rdd2 = sc.parallelize(chars,2) rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[65] at parallelize at <console>:26 scala> rdd2.glom.collect res151: Array[Array[String]] = Array(Array(a, b, c, d, e), Array(f, g, h, i, j)) scala> rdd2.reduce((x,y)=>x+y) res152: String = abcdefghij
In this example the collection of Char
values has been transformed into a collection of String
values, because Strings, but not Chars allow the application of the +
method.
The mechanisme working behind the reduce method can be shown by the following recursive Scala function reducing the elements of a List-collection:
scala> def red[A](li: List[A],op: (A,A)=> A): A = { | val s = op(li(0),li(1)) | val lid = li.drop(2) | if (lid.size == 0) return s | red(s::lid,op) | } red: [A](li: List[A], op: (A, A) => A)A
The function red
receives as first input in square brackets a type value, which is to be used for input and output vlaues. Following in round brackets are two input values: the list object to be reduced and the function defining the operation to be employed for the reduction. In the function body, the first two elements of the list are used by the function to produce an result. A new list is created in two steps: first dropping the first two elemants using the drop(2)
method and then prepending the result s
of the operation using the ::
operator. Then the new list, which has one element less than the old one is used as input for calling recursively the function red
. The recursion loop is broken, when the length of the list is two and the operation of the function on the remaining two element is output as the end result for the reduction.
This function now is used to reduce the List-collection of strings chars
from the previous example:
scala> red[String](chars,(x,y)=>x+y) res2: String = abcdefghij
and produces the expected result.
It will be interesting to follow the separate steps in the reduction process by including a print statement in the function literal:
scala> red[String](chars,(x,y)=>{println(x,y);x+y}) (a,b) (ab,c) (abc,d) (abcd,e) (abcde,f) (abcdef,g) (abcdefg,h) (abcdefgh,i) (abcdefghi,j) res4: String = abcdefghij
As expected, in each reduction step the next character is appended to the partial string result obtained so far.
Now the reduction steps will be examined for RDD-collections, built with different numbers of partitions. With one partition, the reduction of the RDD-collection results from the same succession of steps as have been seen in the reduction of the List-collection:
scala> val rdd_1p = sc.parallelize(chars,1) rdd_1p: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:26 scala> red[String](chars,(x,y)=>{println(x,y);x+y}) (a,b) (ab,c) (abc,d) (abcd,e) (abcde,f) (abcdef,g) (abcdefg,h) (abcdefgh,i) (abcdefghi,j) res8: String = abcdefghij
With 2 partitions, a different pattern for the reduction is observed:
scala> val rdd_2p = sc.parallelize(chars,2) rdd_2p: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:26 scala> rdd_2p.glom.collect res9: Array[Array[String]] = Array(Array(a, b, c, d, e), Array(f, g, h, i, j)) scala> rdd_2p.reduce((x,y)=>{println(x,y);x+y}) (a,b) (f,g) (ab,c) (fg,h) (fgh,i) (fghi,j) (abc,d) (abcd,e) (fghij,abcde) res10: String = fghijabcde
The succession of reduction steps shows, that the characters in each partitions are concatenated separately, and only at the end the two partial strings from the two partitions are combined to the final complete string. The partial reduction in the partitions is done simultaneously, the resulting ordering in the final string therefore is nondeterministic, dependimg on the which partion is the first to finish its partial reduction. In order to obtain a unique result, the reduction operation must be commutative, a condition, wich is not fulfilled by the concatination of characters.
The parallel processing of the partial reductions in the different partitions can again be observed with three partitions:
scala> val rdd_3p = sc.parallelize(chars,3) rdd_3p: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at <console>:26 scala> rdd_3p.glom.collect res4: Array[Array[String]] = Array(Array(a, b, c), Array(d, e, f), Array(g, h, i, j)) scala> rdd_3p.reduce((x,y)=>{println(x,y);x+y}) (a,b) (d,e) (g,h) (ab,c) (gh,i) (de,f) (ghi,j) (abc,ghij) (abcghij,def) res6: String = abcghijdef
Building and Submitting Spark Applications
The Spark Shell is appropriate for experimenting with Spark's features and for developing and testing small programs. Applications for production purposes will consist of program files, which have to be compiled and linked into an executable object, that can be submitted for execution on a cluster. The components involved in running a spark application are displayed schematically in the following picture:
The client
process submits the application, including specifications for the execution environment and eventually parameters for the application.
A driver
process is started, which handles the statements contained in the program file for the spark application. Every Spark application program has to specify a SparkContex
, in which the details for executing the appliction - e.g. type of cluster, number of executors, cores and memory of executors - are defined. According to these requirements the driver initiates one or more executor
processes with the required number of cores to be run on the cluster. Of course the cluster to be used must provide enough computing resoures for the requirements of the SparkContext.
The execution of the statements in the Spark application program then will be shared between driver and executors. Statements involving RDDs will be executed on the executors. An operation for a RDD is split into a number of tasks acccording to the number of partitions in the RDD, each task responsible for executing the operation on the data set in the respective partition. The driver process includes a scheduler
, which sends these tasks to the executors for computing the required operations on the data of the partition on one of the executor cores. The degree of parallelisme for the Spark application therefore is determined by the total number of cores of all executors, because every core can work in parallel on a different partition of a RDD.
In addition to distribute the work to be done on the partitions of RDDs to the excutors, the driver collects and combines the results the executors produce for action operations on RDD partitions and processes all statements in the program, which do not involve RDDs.
The next sections will describe:
- defining the
SparkContext
in a scala program for a Spark application, - producing an executable jar-file from this program using the sbt (simple built tool)
- running the application with
spark-submit
- running the application on a Spark standalone cluster
- submitting Spark application job to the scheduling system SLURM for execution on GWDG's scientific compute cluster.
Setting the Spark Context
Establishing the connction to the SparkContext
in a Scala programm for a Spark application is demontrated by the following code:
import org.apache.spark.SparkContext object SparkConnect { def main(args: Array[String] = Array("")) { val sp = new SparkContext() println(sp.getConf.toDebugString) sp.stop() } }
In the first line of the program the org.apache.spark.SparkContext
package is imported to provide access to the SparkContext class.
Then the object SparkConnect
is created, containing a single method main
. In the main
method the instance sp
of the class SparkContext
is created. This instance has access to the methods provided in the SparkContext
class for generating and manipulating RDD
collections. In this example, the instance of the SparkContext is generated without specifying values for its properties. The configuration of the context's properties will be taken from defaults defined for the execution environment to which the application is submitted.
The printstatement gives the details of the context's configuration, which in this example uses the default values from the environment.
The last statement deactivates the connection to the created SparkContext instance. A Spark application can be connected to only one single SparkContext. Trying to create a second connection will result in an error and terminate the application. Therefore the stop method has to be applied to an existing SparkContext instance, before a new one can be established.
If it is not known, wether a connection to a SparkContext allready exists, the new connection should be established with the getOrCreate
method for the SparkContext class:
val sp = SparkContext.getOrCreate()
If an instance of SparkContext
already exists, this will be used for sp
, if not, sp
will be a new instance of SparkContext
.
From the file containing this program code an executabel jar
file can be generated as will be explained in the next section. Invoking the executable with the spark-submit
command will then call the main
method defined for the object in the program file. Arguments for the main method and the settings for the configuration of the SparkContext can be passed with parameters and options, as will described in detail in a later section.
The program, stored in a file SparkConnect.scala
can be tested in the Spark Shell by loading it with the :load
command and then calling the main method for the object defined in the program file:
scala> :load SparkConnect.scala Loading SparkConnect.scala... import org.apache.spark.SparkContext defined object SparkConnect scala> SparkConnect.main() org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). ... scala> sc.stop() scala> SparkConnect.main() spark.app.id=local-1550248586719 spark.app.name=Spark shell spark.driver.host=gwdu102.global.gwdg.cluster spark.driver.port=36803 spark.executor.id=driver spark.jars= spark.master=local[*] spark.submit.deployMode=client spark.ui.showConsoleProgress=true
As was explained in previous sections, invoking the Spark Shell automatically provides a connection to a SparkContext with the value sc
. The creation of an additional instance of SparkConnect
in the main method is not allowed and leads to an execption. Only after stopping the Spark shell instance sc
the main method can create the new instance sp
and print out the properties of its configuration. The new instance gets its properties from the Spark Shell environment. In particular the application name is set to Spark shell
and the master is set to local[*]
.
The properties of the SparkContext instance can be defined locally in the main
definition by creating an instance of the class SparkConf
with prescibed values for properties and creating the SparkContext with the value of the SparkConf instance as argument :
import org.apache.spark.{SparkConf, SparkContext} import org.apache.log4j.{ Logger, Level} object SparkConnect { def main(args: Array[String] = Array("")) { Logger.getLogger("org").setLevel(Level.WARN) val co = new SparkConf(false).setMaster("local[2]").setAppName("SparkApp") val sp = new SparkContext(co) println(sp.getConf.toDebugString) sp.stop } }
The argument false
for the SparConf class prevents the setting of default properties from the environment, setting this argument to true
or setting no argument leads to taking over the environments properties. The setMaster
and setAppName
methods then set the chosen custom values for these properties. The setting of master and appname is obligatory for a valid SparkContext, without setting these two properties the creation of the SparkContext's instance will fail.
There is a large list of properties to be set, see the corresponding documentation for Spark Configuration. Only for a small set of properties a separate set-method is provided. The majority of properties has to be set with the method set(“property-name”,“property-value”)
.
A further addition in this example is the setting of the log level to WARN
, which prevents the printing of a lot of INFO
messages.
Building the Spark Application Executable with SBT
There exist many different tools for compiling Scala code and build an executable jar-file. One of them is sbt
, which stands for “simple build tool” or “scala build tool”. The sbt tool is not simple at all, as a look into the sbt reference manual shows. But for the Spark applications treated in these sections only a very restricted set of sbt features will be needed and explained in the following.
In order to use the sbt tool on GWDG's cluster, the following module has to be loaded:
module load scala/sbt
The current working directory, from which sbt
tasks are startet, is called the base directory
in the sbt documentation. In the base directory
at least one file with the suffix .sbt
has to be provided, containing information concerning the tasks sbt shall execute. Usually the name build.sbt
is chosen for this file, but sbt uses information from all files in the base directory
with the suffix .sbt
.
The following file build.sbt
placed in the base directory will be the starting point for building a Spark application:
/* build.sbt */ name := "SparkApp" scalaVersion := "2.11.8" logLevel := Level.Error libraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % "2.3.1")
The build file defines a name
for the application, which will be a part of the name for the jar-file generated by sbt. Then the scalaVersion
used for compiling the the program code is specified and finally the version 2.3.1
of the spark-core
package is added to the library dependency, such that the import of Spark packages in the program code can be resolved. The Loglevel setting Level.Error
prevents the printing of many WARN
messages concerning version conflicts resulting from automatically included libraries.
The executable jar file for a Spark application is built with the package
task.
sbt looks for source code files with suffix .scala
in the base directory and in some default directories relative to the base directory, e.g. src/main/scala
. The use of sbt will be demonstrated with a source file SparkConnect.scala
, containing the code generating the SparkConnect
from the previous section.
An interactive sbt shell can be invoked in the base directory with the command sbt
gwdu102:54 11:16:02 ~/SparkApp > sbt [info] Updated file /home/uni05/ohaan/SparkApp/project/build.properties: set sbt.version to 1.1.6 [info] Loading project definition from /home/uni05/ohaan/SparkApp/project [info] Loading settings from build.sbt ... [info] Set current project to SparkApp (in build file:/home/uni05/ohaan/SparkApp/) [info] sbt server started at local:///usr/users/ohaan/.sbt/1.0/server/bab34b8d3cad9a03bcdd/sock sbt:SparkApp> sbt:SparkApp> show sources [info] * /home/uni05/ohaan/SparkApp/SparkConnect.scala [success] Total time: 0 s, completed Feb 20, 2019 11:17:43 AM
Invoking sbt for the first time generates a complex hierarchy of directories, which will only be described as far as relevant for the current purpose of generating a Spark application. The first [info]
line states, that the newly generated directory projects
has a file build.properties
containing the used version of sbt, 1.1.6 in this example.
The sbt-shell prompt sbt:SparkApp>
takes the extension SparkApp
from the name
value set in build.sbt
With the task show sources
all files ending with .scala
from the default source directories will be displayed.
The follwing list shows some sbt tasks, wich can be executed in the sbt-shell.
tasks | description |
---|---|
tasks | displays all tasks available in the sbt-shell |
compile | compiles the code from source files, generating class files |
package | produces binary jar files from the compiled sources ready for execution |
run | runs the main class from the executable jar file |
clean | deletes files produced by the sbt tasks, such as compiled classes and binary jar files |
reload | reloads the project in the base directory (after changes of settings in the .sbt file) |
exit | ends the sbt-shell |
These tasks can also be executed as batch commands sbt <task>
issued from the base directory. The sbt-shell offers in addition the possibility of continuous commands by prepending the task name by ~
, as e.g.~compile
. In the continous command mode the shell executes the tasks repeatedly after changes in files, on which the task depends.
The executable jar files for Spark applications are generated with the package
task.
sbt:SparkApp> package [success] Total time: 4 s, completed Feb 20, 2019 3:43:39 PM sbt:SparkApp> show package [info] /home/uni05/ohaan/SparkApp/target/scala-2.11/sparkapp_2.11-0.1.0-SNAPSHOT.jar [success] Total time: 0 s, completed Feb 20, 2019 3:43:43 PM
The second command show package
returns the location /home/uni05/ohaan/SparkApp/target/scala-2.11/
and name sparkapp_2.11-0.1.0-SNAPSHOT.jar
of the generated jar file. This name includes the project property settings name
, version
and scalaVersion
from the build.sbt file. Since no version
value for the project was set in the example, the default value 0.1.0-SNAPSHOT
was chosen.
The executable jar file can be submitted by the run
task from sbt:
sbt:SparkApp> run Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 19/02/20 16:03:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable spark.app.id=local-1550675006775 spark.app.name=SparkApp spark.driver.host=gwdu102.global.gwdg.cluster spark.driver.port=39956 spark.executor.id=driver spark.master=local[2] [success] Total time: 3 s, completed Feb 20, 2019 4:03:27 PM
Running a Spark Application with spark-submit
The spark-submit
command starts the client process for a Spark Application and allows to set at run time configuration properties for the SparkContext.
spark-submit \ --master <master-url> \ --conf <key>=<value> \ ... # other options <application-jar> \
Some properties can be set using a reserved option name, for example the --master
option determines the compute environment to be used for the Spark application. Some possible values for the master-url are
Master URL | Meaning |
---|---|
local | Run Spark locally with one worker thread (i.e. no parallelism at all) |
local[K] | Run Spark locally with K worker threads |
local[*] | Run Spark locally with as many worker threads as logical cores on your machine |
spark://HOST:PORT | Connect to the given Spark standalone cluster master |
The complete list of possible options with their default settings can be inspected by calling spark-submit --help
spark-submit
may be invoked without any option, only the absolute or relative path to the jar file containing the executable spark Application must be provided. Then the configuration properties will be set to their default values.
A Spark Application creating a SparkContext without specifying any properties as in
import org.apache.spark.SparkContext object SparkConnect { def main(args: Array[String] = Array("")) { val sp = new SparkContext() println(sp.getConf.toDebugString) sp.stop() } }
with a corresponding application-jar file target/scala-2.11/sparkapp_2.11-0.1.0-SNAPSHOT.jar
produced by the sbt package
command as described in the previous section, will produce the following output by submitting it without explicit settings for the options
~/SparkApp > $SPARK_HOME/bin/spark-submit target/scala-2.11/sparkapp_2.11-0.1.0-SNAPSHOT.jar ... spark.app.id=local-1557919036370 spark.app.name=SparkConnect spark.driver.host=gwdu102.global.gwdg.cluster spark.driver.port=41872 spark.executor.id=driver spark.jars=file:/home/uni05/ohaan/SparkApp/target/scala-2.11/sparkapp_2.11-0.1.0-SNAPSHOT.jar spark.master=local[*] spark.submit.deployMode=client
Ways to Configure the SparkContext
The properties, which can be configured for the SparkContext are listed in the section Spark Configuration of the Spark documantation.
Here is a short collection of properties from this list
property name | default | description |
---|---|---|
spark.master | (none) | the cluster manager to connect to |
spark.cores.max | (not set) | the maximum amount of CPU cores to request for the application from across the cluster |
spark.executor.cores | all the available cores on the worker in standalone mode | the number of cores to use on each executor |
spark.default.parallelism | for distributed shuffle operations the largest number of partitions in a parent RDD; for operations like parallelize with no parent RDDs total number of cores on all executor nodes | default number of partitions in RDDs |
There are three different ways to set a configuration property for a SparkContext, which are in order od precedence:
With the method set
for the SparkConf
object in the application program
SparkConf.set("<property_name>","<property_value>")
with the option --conf
in the spark-submit
command
--conf <property_name>=<property_value>
with a line in a the spark configuration file
<property_name> <property_value>
with the name spark-defaults.conf
in the directory specified in the environmment variable SPARK_CONF_DIR
or with a name specified in the option --properties-file
in the spark-submit
command
Some frequently used properties can be set with a simplified syntax. Examples are the method setMaster(“<master-url>”)
for the SparkConfig
class or the option --master <master-url>
for the spark-submit
command.
Setting up a Spark Standalone Cluster
Starting an application with spark-submit
without specifying the spark.master
property for the SparkContext
or with the value local[*]
for the option --master
will use all the cores of the local node for executing the application. Multiple nodes for a distributed execution of a Spark application can be used on a Spark standalone cluster. The standalone mode consists of a master and one or more workers. The Spark installation provides in the directory $SPARK_HOME/sbin
the following scripts for managing the standolone mode:
sript name | purpose |
---|---|
start-master.sh | Starts a master instance on the machine the script is executed on |
stop-master.sh | Stops the master that was started via the start-master.sh script |
start-slave.sh | Starts a worker instance on the machine the script is executed on |
stop-slave.sh | Stops the worker that was started via the start-slave.sh script |
start-slaves.sh | Starts worker instances on all nodes listed in the file slaves |
stop-slave.sh | Stops the workers that were started via the start-slaves.sh script |
start-all.sh | Combines starting master and workers |
stop-all.sh | Combines stopping master and workers |
Starting the Master
The following lines show the steps for starting a master process for a Spark standalone cluster
gwdu102 > module load JAVA/jdk1.8.0_31 spark gwdu102 > export SPARK_LOG_DIR=~/SparkLog gwdu102 > ${SPARK_HOME}/sbin/start-master.sh starting org.apache.spark.deploy.master.Master, logging to /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.master.Master-1-gwdu102.out gwdu102 > grep Starting /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.master.Master-1-gwdu102.out 2019-07-22 15:45:52 INFO Master:54 - Starting Spark master at spark://gwdu102.global.gwdg.cluster:7077 gwdu102 > jps 31663 Jps 29761 Master
Before starting the master by executing $SPARK_HOME/sbin/start-master.sh
, the environment for JAVA and Spark must be initialized by loading the appropriate modules. Furthermore the environment variable SPARK_LOG_DIR
has to be set to the name of a log directory, which will be created if it does not yet exist.
The invocation of start-master.sh
produces a message, which includes the name of the log file in this directory.
Among many other messages printed in this log file is a line with the master-url
spark://gwdu102.global.gwdg.cluster:7077
, which includes the name of the host, on which the master has been started and the number of the port, on which the master listens. If not specified with the option --port <port number>
, the port number is 7077 by default.
The workers in the Spark stanalone cluster will use this master-url for the connection to the master, and the spark-submit
command will need the master-url to start the Spark application in the standolane mode.
With the command jps
the running of the master process in a java virtual machine can be verified.
Starting a Worker
Now a worker process can be started as follows:
gwdu102 > export SPARK_WORKER_DIR=~/SparkWorker gwdu102 > ${SPARK_HOME}/sbin/start-slave.sh spark://gwdu102:7077 starting org.apache.spark.deploy.worker.Worker, logging to /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.worker.Worker-1-gwdu102.out gwdu102 > grep Starting /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.worker.Worker-1-gwdu102.out 2019-07-22 15:47:45 INFO Worker:54 - Starting Spark worker 10.108.96.102:46672 with 16 cores, 61.9 GB RAM gwdu102 > grep registered /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.worker.Worker-1-gwdu102.out 2019-07-22 15:47:45 INFO Worker:54 - Successfully registered with master spark://gwdu102.global.gwdg.cluster:7077 gwdu102 > grep Registering /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.master.Master-1-gwdu102.out 2019-07-22 15:47:45 INFO Master:54 - Registering worker 10.108.96.102:46672 with 16 cores, 61.9 GB RAM gwdu102 > jps 18861 Jps 3081 Worker 29761 Master
For starting a worker calling ${SPARK_HOME}/sbin/start-slave.sh
with the master-url as an obligatory argument, the name of a worker-directory has to be supplied, into which output from running applications on the worker will be stored. This name can be provided by setting the environment variable SPARK_WORKER_DIR
. Alternatively this name can be set in the option --work-dir
following the master-url argument in the calling sequence for starting the worker. This directory will be created if it does not yet exist. .
The message from starting the worker shows, that a log file is produced in the directory indicated by the environment variable SPAR_LOG_DIR
, which was set before starting the master. This logfile contains a line stating the start of the worker process , the number of cores and the amount of memory
allocated to the worker. A further line announces the registering of this worker with the master. The same event is listed in the master log file as well.
As can be seen from the jps
command, master and worker processes are now running on the same host.
By default the number of available cores and all available memory of a node is allocated to the worker startet with the start-slave.sh
script. Different amount of hardware ressources for the worker can be specified by setting the environment variables SPARK_WORKER_CORES
to the number of required cores and SPARK_WORKER_MEMORY
to the size of required memory (e.g. to 1000M or 2G). Instead of setting the environment variables, these ressources can be required by setting the options --core
and --memory
in the calling sequence for starting the worker.
Starting a Worker on a Remote Node
The standalone cluster can include workers running on different nodes by starting the start-slave.sh
script on different nodes.
gwdu102 > ssh gwdu103 Last login: Wed Jun 26 11:36:28 2019 from gwdu102.global.gwdg.cluster gwdu103 > module load JAVA/jdk1.8.0_31 spark gwdu103 > export SPARK_LOG_DIR=~/SparkLog gwdu103 > ${SPARK_HOME}/sbin/start-slave.sh spark://gwdu102:7077 --cores 4 --work-dir ~/SparkWorker starting org.apache.spark.deploy.worker.Worker, logging to /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.worker.Worker-1-gwdu103.out gwdu103 > jps 20784 Worker 21588 Jps gwdu103 > exit logout Connection to gwdu103 closed. gwdu102 > grep Registering /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.master.Master-1-gwdu102.out 2019-07-22 15:47:45 INFO Master:54 - Registering worker 10.108.96.102:46672 with 16 cores, 61.9 GB RAM 2019-07-22 16:28:08 INFO Master:54 - Registering worker 10.108.96.103:39968 with 4 cores, 61.8 GB RAM
Of course after login with ssh
from node gwdu102
to the node gwdu103
the modules for using Spark have to be loaded and the variable SPARK_LOG_DIR
has to be set before the start of a worker on this node is possible. In this example worker directory and number of worker cores are specified by setting values for the options --work-dir
and --core
.
All activities for setting up the standalone cluster are documented in the logfile ~/SparkLog/spark-ohaan-org.apache.spark.deploy.master.Master-1-gwdu102.out
. The two lines displayed with the grep
command show, that now two workers on two different nodes are available for the application.
Starting Multiple Workers with ''start-slaves.sh''
The script sbin/start-slaves.sh
will start multiple workers on different nodes using remote commands via ssh
. The node names must be provided as separate lines in a file slaves
. This file has to be created in the directory specified in the environment variable SPARK_CONF_DIR
. The default value for this variable is $SPARK_HOME/conf
. Because this default is a read only directory in the Spark installation on GWDG's compute cluster, SPARK_CONF_DIR
has to be explicitely set to a name of a local directory with permission for creating files. The Spark standalone scripts inspect this directory also for the existence of a file spark-env.sh
, from which values for environment variables can be set, e.g. SPARK_LOG_DIR
and SPARK_WORKER_DIR
for the directories needed for the start of master and worker, and SPARK_WORKER_CORES
and SPARK_WORKER_MEMORY
for the amount of hardware ressources to be allocated the workers.
The environment created in the local shell from which the script sbin/start-slaves.sh
is startet will not be passed to the bash shell created on the remote nodes by the ssh
commands. But in the remote shell the file ~/.bashrsc
is sourced before the required commands are executed. Putting export
statements for the necessary environment variables into the file ~/.bashrsc
on the remote node therefore will provide the environment needed to execute the standalone scripts on the remote nodes. Since the file system in GWDG's cluster is global, the ~/.bashrsc
, set up in the local shell, will be available on all nodes.
The following screen shots show how to set up the standalone cluster with the sbin/start-slaves.sh
script:
Setting environment variables SPARK_HOME
, JAVA_HOME
by loading the modules for Spark and Java:
gwdu102 > module load JAVA spark gwdu102 > echo $JAVA_HOME /usr/product/bioinfo/JAVA/jdk1.8.0_31/jre gwdu102 > echo $SPARK_HOME /usr/product/parallel/spark/2.3.1/spark-2.3.1-bin-hadoop2.7
Setting the environment variable SPARK_CONF_DIR
and creating the directory to which it points:
gwdu102 > export SPARK_CONF_DIR=~/SparkConf gwdu102 > mkdir -p $SPARK_CONF_DIR
Generating a file spark-env.sh
which sets the environment variables SPARK_LOG_DIR
and SPARK_WORKER_DIR
and, optionally additional varables, e.g. SPARK_WORKER_CORES
and SPARK_WORKER_MEMORY
:
gwdu102 > echo "export SPARK_LOG_DIR=~/SparkLog" > $SPARK_CONF_DIR/spark-env.sh gwdu102 > echo "export SPARK_WORKER_DIR=~/SparkWorker" >> $SPARK_CONF_DIR/spark-env.sh gwdu102 > echo "export SPARK_WORKER_CORES=4" >> $SPARK_CONF_DIR/spark-env.sh gwdu102 > echo "export SPARK_WORKER_MEMORY=4048M" >> $SPARK_CONF_DIR/spark-env.sh gwdu102 > cat $SPARK_CONF_DIR/spark-env.sh export SPARK_LOG_DIR=~/SparkLog export SPARK_WORKER_DIR=~/SparkWorker export SPARK_WORKER_CORES=4 export SPARK_WORKER_MEMORY=4048M
Generating a file slaves
containing the names of the nodes on which a worker shall be started:
gwdu102 > echo gwdu102 > $SPARK_CONF_DIR/slaves gwdu102 > echo gwdu101 >> $SPARK_CONF_DIR/slaves gwdu102 > echo gwdu103 >> $SPARK_CONF_DIR/slaves gwdu102 > cat $SPARK_CONF_DIR/slaves gwdu102 gwdu101 gwdu103
Generating the file ~/.bashrc
to set the environment for the shell started by the ssh
commands on the remote nodes:
gwdu102 > echo "export SPARK_HOME=$SPARK_HOME" > ~/.bashrc gwdu102 > echo "export JAVA_HOME=$JAVA_HOME" >> ~/.bashrc gwdu102 > echo "export SPARK_CONF_DIR=$SPARK_CONF_DIR" >> ~/.bashrc gwdu102 > cat ~/.bashrc export SPARK_HOME=/usr/product/parallel/spark/2.3.1/spark-2.3.1-bin-hadoop2.7 export JAVA_HOME=/usr/product/bioinfo/JAVA/jdk1.8.0_31/jre export SPARK_CONF_DIR=/usr/users/ohaan/SparkConf
Now the scripts for starting master and worker can be run:
gwdu102 > $SPARK_HOME/sbin/start-master.sh starting org.apache.spark.deploy.master.Master, logging to /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.master.Master-1-gwdu102.out gwdu102 > $SPARK_HOME/sbin/start-slaves.sh gwdu103: starting org.apache.spark.deploy.worker.Worker, logging to /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.worker.Worker-1-gwdu103.out gwdu102: starting org.apache.spark.deploy.worker.Worker, logging to /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.worker.Worker-1-gwdu102.out gwdu101: starting org.apache.spark.deploy.worker.Worker, logging to /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.worker.Worker-1-gwdu101.out
The last lines of the logfile for the master report the successfull starts of the three workers:
gwdu102 > more /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.master.Master-1-gwdu102.out ... 2019-07-23 15:44:19 INFO Master:54 - Registering worker 10.108.96.102:38162 with 4 cores, 4.0 GB RAM 2019-07-23 15:44:21 INFO Master:54 - Registering worker 10.108.96.103:46111 with 4 cores, 4.0 GB RAM 2019-07-23 15:44:21 INFO Master:54 - Registering worker 10.108.96.101:40362 with 4 cores, 4.0 GB RAM
These lines also show, that the workers provide the number of cores and amount of local memory specified in the environment variables SPARK_WORKER_CORES
and SPARK_WORKER_MEMORY
via the .bashrc
file.
The standalone cluster can be started in one step by running the command sbin/start-all.sh
which combines sbin/start-master.sh
and start-slaves.sh
. The workers are stopped by sbin/stop-slaves.sh
, the master ist stopped by sbin/stop-master
, master and workers are stopped in one step by sbin/stop-all.sh
.
Submitting to the Spark Standalone Cluster
The Spark standalone cluster provides cores and memory on different worker nodes. The Spark application uses the resources from the standalone cluster in a way specified by configuration properties for the SparkContext. These properties must include the master_url, furthermore number of cores and amount of memory for the executors can be prescribed and the default number of partitions in a RDD collection can be fixed. Different ways to set SparkContext properties programmatically or at execution time have been described in the section Ways to Configure the SparkContext
. The following script setSparkcConf.sh
sets the property spark.default.parallelism
to the total number of cores of all worker nodes and choses for spark.executor.cores and spark.executor.memory fixed values by generating the corresponding configuration file $SPARK_CONF_DIR/spark-defaults.conf
.
nodelist=`cat $SPARK_CONF_DIR/slaves` total_cores=0 for host in $nodelist do cores=$(ssh $host '. $SPARK_CONF_DIR/spark-env.sh;echo $SPARK_WORKER_CORES') total_cores=$(( $total_cores + $cores )) done echo "spark.default.parallelism" $total_cores > $SPARK_CONF_DIR/spark-defaults.conf echo "spark.submit.deployMode" client >> $SPARK_CONF_DIR/spark-defaults.conf echo "spark.master" spark://`hostname`:7077 >> $SPARK_CONF_DIR/spark-defaults.conf echo "spark.executor.cores" 2 >> $SPARK_CONF_DIR/spark-defaults.conf echo "spark.executor.memory" 1024M >> $SPARK_CONF_DIR/spark-defaults.conf
A toy application, showing the distribution of partitions onto executors, is generated by the following scala program:
/* SparkApp.scala */ import org.apache.spark._ import org.apache.spark.{SparkConf, SparkContext} import org.apache.log4j.{ Logger, Level} import sys.process._ object SparkApp { def work(i: Int): (String,String,String) = { val str1 = "date +%T".!! val time = str1.take(str1.length-1) val str2 = "hostname".!! val host = str2.take(str2.length-1) val id = SparkEnv.get.executorId "sleep 10".! return (host,id,time) } def main(args: Array[String] = Array("")) { Logger.getLogger("org").setLevel(Level.WARN) val sp = new SparkContext() println(sp.getConf.toDebugString) val st = System.nanoTime val rdd = sp.parallelize(1 to sp.defaultParallelism).cache println("\nInformation from partitions:") println("host,executor_id,start_time") rdd.map(x=>work(x)).collect.foreach(x=>println(x+" ")) val dt = (System.nanoTime - st)/1e9d println("\n total time for application: "+dt) sp.stop } }
This program defines a function work
, which simulates some work by calling sleep 10
and which returns three strings: the hostname on which the function was executed, the id of the executor executing the function and the time of starting the function execution.
The main program, after printing the configuration of the SparkContext, generates a RRD with a number of elements equal to the defaultParallelism
value of the SparkContext, which is split into the same number of partitions, such that every partition of the RDD cantains just one element. Then the result from calling the map method to execute the work
function for every element of the RDD is printed. From registering the system time before and after the execution of the RDD operations, the total execution time is calcluated.
As described in the section Building the Spark Application Executable with SBT
an executable jar file ~/SparkApp/target/scala-2.11/sparkapp_2.11-0.1.0-SNAPSHOT.jar
can be generated, which in turn can be executed with the spark-submit command.
For a Spark standalone cluster running three workers with two cores, and setting the number of cores for the executors to two, the following output results:
gwdu102 > spark-submit ~/SparkApp/target/scala-2.11/sparkapp_2.11-0.1.0-SNAPSHOT.jar 2019-07-29 17:43:42 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable spark.app.id=app-20190729174343-0003 spark.app.name=SparkApp spark.default.parallelism=6 spark.driver.host=gwdu102.global.gwdg.cluster spark.driver.port=46197 spark.executor.cores=2 spark.executor.id=driver spark.executor.memory=1024M spark.jars=file:/usr/users/ohaan/SparkApp/target/scala-2.11/sparkapp_2.11-0.1.0-SNAPSHOT.jar spark.master=spark://gwdu102:7077 spark.submit.deployMode=client Information from partitions: host,executor_id,start_time (gwdu102,2,17:43:46) (gwdu102,2,17:43:46) (gwdu101,1,17:43:47) (gwdu101,1,17:43:47) (gwdu103,0,17:43:48) (gwdu103,0,17:43:48) total time for application: 14.401789086
The SparkContext configuration has the expected properties for the defaultParallelism (equal to 6, the total number of cores in the standalone cluster) and for the executor cores (equal to 2). The RDD collection was created with 6 elements, spread into 6 partition, so every partion contains just 1 element. Every partition of the RDD is send by the driver to an executor for performing the action on the partitions elements as defined in the function given as argument to the map method. The output shows, that three executors have been working (executor ids 0, 1, 2), each executor on a different host, and every executor acting on 2 partitions. Since the total core number for running the application is 6, the computation for every partition is scheduled to a different core and all 6 computations can start nearly simultaneously, delayed only by the time needed for the scheduling itself. Comparing the total time for the application of 14.4 sec to the time of 10 sec used in a single work function shows, that this overhead is aprox. 4 sec.
Running Spark Applications on GWDG's Scientific Compute Cluster
Jobs on GWDG's compute cluster are managed by the SLURM workload manager as described in the section Running Jobs with SLURM of GWDG's HPC documentation.
A Spark standalone cluster can be started on nodes of the compute cluster by submitting the SLURM sbatch
command with a script specifying the required resources, followed by the commands for setting up the standalone cluster as described in the previous two sections. The resources to be allocated by the SLURM system should include as many slurm tasks
as spark executors
needed for the spark application, and each task should provide the number of cores and the amount of memory that an executor needs for the spark application. Therefore the sbatch script must include the options --ntasks
, --cpus-per-task
and --mem-per-cpu
, whereas an option --nodes
is not neeeded, because the SLURM manager can decide on the number of nodes for providing the resources for the required number of tasks.
An example is the following start-standalone.script
:
#!/bin/bash #SBATCH --partition=medium #SBATCH --ntasks=4 #SBATCH --cpus-per-task=2 #SBATCH --mem-per-cpu=2048 #SBATCH --time=60:00 #SBATCH --output=outfile-%J . setenv.sh $SPARK_HOME/sbin/start-all.sh . wait-worker.sh sleep infinity
Submitting this script from the login host with the command sbatch start-standalone.script
will start a Spark standalone cluster, which will be alive for the time required by the option --time=60:00
, or until the command scancel <slurm_job_id>
is launched, where <slurm_job_id>
is the job-id returned from the sbatch command.
The commands in the batch script start with sourcing the script setenv.sh
to prepare the environment for starting the standalone cluster:
##setenv.sh## module load JAVA/jdk1.8.0_31 spark export SPARK_CONF_DIR=~/SparkConf mkdir -p $SPARK_CONF_DIR env=$SPARK_CONF_DIR/spark-env.sh echo "export SPARK_LOG_DIR=~/SparkLog" > $env echo "export SPARK_WORKER_DIR=~/SparkWorker" >> $env echo "export SLURM_MEM_PER_CPU=$SLURM_MEM_PER_CPU" >> $env echo 'export SPARK_WORKER_CORES=`nproc`' >> $env echo 'export SPARK_WORKER_MEMORY=$(( $SPARK_WORKER_CORES*$SLURM_MEM_PER_CPU ))M' >> $env echo "export SPARK_HOME=$SPARK_HOME" > ~/.bashrc echo "export JAVA_HOME=$JAVA_HOME" >> ~/.bashrc echo "export SPARK_CONF_DIR=$SPARK_CONF_DIR" >> ~/.bashrc scontrol show hostname $SLURM_JOB_NODELIST > $SPARK_CONF_DIR/slaves conf=$SPARK_CONF_DIR/spark-defaults.conf echo "spark.default.parallelism" $(( $SLURM_CPUS_PER_TASK * $SLURM_NTASKS ))> $conf echo "spark.submit.deployMode" client >> $conf echo "spark.master" spark://`hostname`:7077 >> $conf echo "spark.executor.cores" $SLURM_CPUS_PER_TASK >> $conf echo "spark.executor.memory" $(( $SLURM_CPUS_PER_TASK*$SLURM_MEM_PER_CPU ))M >> $conf
The role of the commands in this file have been explained in detail in the two previous sections. The number of cores for a Spark standalone worker is determined dynamically by the command `nproc`
, which evaluates on each node to the number of cores allocated by the SLURM manager on the node. This number is equal to the number of tasks allocated to the node times the value specified in the SBATCH option --cpus-per-task
. The amount of memory for a worker is determined by the product of this number of cores times the memory setting in the SBATCH option --mem-per-cpu
, which is available as the environment variable SLURM_MEM_PER_CPU
.
The nodes on which workers are to be started are listed in a special format in the environment variable SLURM_JOB_NODELIST
, which is expanded to the node names with the scontrol
command, the result of which is written to the slaves
file.
In the last group of commands the properties for the SparkContext are set, which are needed for running the Spark application on the started standalone cluster. In particular, the spark.default.parallelism
is set to the product of allocated SLURM tasks times the number of allocated cpus per tasks, i.e. to the total number of allocated cores. The property spark.executor.cores
sets the numer of cores for every executor to the number of cpus allocated to every task in $SLURM_CPUS_PER_TASK
, and spark.executor.memory
sets the amount of memory for every executor to the amount of memory allocated the a SLURM task.
After preparing the environment for the standalone cluster, the next command $SPARK_HOME/sbin/start-all.sh
will start the cluster as described in the section Setting up a Spark Standalone Cluster
.
The start-all.sh
command returns after starting the master und worker processes. The standalone cluster will not be ready for executing the Spark application, until a connection between master and worker has been established and the workers processes have been registerd with the master process. Sourcing the script wait-workers.sh
will wait until the registering of all workers has succeeded. This script looks repeatedly into the logfiles for the started workers and returns, if the line with the text Successfully registered with master
has been included in the logfiles for all started workers:
##wait-worker.sh## . $SPARK_CONF_DIR/spark-env.sh num_workers=`cat $SPARK_CONF_DIR/slaves|wc -l` echo number of workers to be registered: $num_workers master_logfile=`ls -tr ${SPARK_LOG_DIR}/*master* |tail -1` worker_logfiles=`ls -tr ${SPARK_LOG_DIR}/*worker* |tail -$num_workers` steptime=3 for i in {1..100} do sleep $steptime num_reg=` grep 'registered' $worker_logfiles|wc -l` if [ $num_reg -eq $num_workers ] then break fi done echo registered workers after $((i * steptime)) seconds : for file in $worker_logfiles do grep 'registered' $file done grep 'Starting Spark master' $master_logfile grep 'Registering worker' $master_logfile|tail -$num_workers
The output of the wait-workers.sh
script will be written to the output file specified by the --output
option in the sbatch script start-standalone.script
. Only after finding lines like
number of workers to be registered: 3 registered workers after 114 seconds : 19/08/11 12:26:25 INFO Worker: Successfully registered with master spark://gwdd034.global.gwdg.cluster:7077 19/08/11 12:27:07 INFO Worker: Successfully registered with master spark://gwdd034.global.gwdg.cluster:7077 19/08/11 12:26:29 INFO Worker: Successfully registered with master spark://gwdd034.global.gwdg.cluster:7077 19/08/11 12:26:24 INFO Master: Starting Spark master at spark://gwdd034.global.gwdg.cluster:7077 19/08/11 12:26:25 INFO Master: Registering worker 10.108.102.34:43775 with 4 cores, 8.0 GB RAM 19/08/11 12:26:29 INFO Master: Registering worker 10.108.102.46:35564 with 2 cores, 4.0 GB RAM 19/08/11 12:27:07 INFO Master: Registering worker 10.108.102.47:37907 with 2 cores, 4.0 GB RAM
in the batch output file, the Spark cluster is ready for executing Spark applications.
Now Spark applications can be submitted from the same login node, from which the sbatch script had been submitted. All necessary setting of the SparkContext properties have been placed into the spark-defauls.conf
file by sourcing the setenv.sh
file in the sbatch script. On the login node only the modules JAVA and spark have to be loaded and the variable SPARK_CONF_DIR
has to be set to the directory into which the spark-defauls.conf
file has been stored. The following screen shot shows the execution of a Spark application started from the login node.
gwdu102 > module load JAVA spark gwdu102 > SPARK_CONF_DIR=~/SparkConf gwdu102 > spark-submit ~/SparkApp/target/scala-2.11/sparkapp_2.11-0.1.0-SNAPSHOT.jar 19/08/11 12:41:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties spark.app.id=app-20190811124153-0000 spark.app.name=SparkApp spark.default.parallelism=8 spark.driver.host=gwdu102.global.gwdg.cluster spark.driver.port=43083 spark.executor.cores=2 spark.executor.id=driver spark.executor.memory=4096M spark.jars=file:/usr/users/ohaan/SparkApp/target/scala-2.11/sparkapp_2.11-0.1.0-SNAPSHOT.jar spark.master=spark://gwdd034:7077 spark.submit.deployMode=client Information from partitions: host,executor_id,start_time (gwdd034,0,12:41:57) (gwdd034,0,12:41:57) (gwdd047,2,12:41:57) (gwdd047,2,12:41:57) (gwdd034,1,12:41:57) (gwdd034,1,12:41:57) (gwdd046,3,12:41:57) (gwdd046,3,12:41:57) total time for application: 13.789137319
For executig the Spark application as a batch job, the batch sript start-standalone.sh
has to be modified
by replacing the command sleep infinity
by
spark-submit ~/SparkApp/target/scala-2.11/sparkapp_2.11-0.1.0-SNAPSHOT.jar $SPARK_HOME/sbin/stop-all.sh