======= Parallel Processing with Spark on GWDG’s Scientific Compute Cluster ======= ===== Apache Spark ===== Apache Spark is a cluster computing framework for processing large data sets, based on [[https://www.scala-lang.org|Scala]], an object oriented functional programming language. Parallel processing in Spark uses the Resilient Distributed Database, RDD. Within an RDD, a given data set is decomposed into a number of partitions, which then are processed in parallel on the available resources in the cluster. The attribute resilient refers to the ability to recover in case of an error occurring for an individual partition and to reprocess this work on a different resource in the cluster. The following sections will describe the Spark environment on GWDG’s compute cluster and the use of Spark’s RDD’s for parallel processing. The Scala programming language will be explained only so far as needed for the examples. Programming guides covering all of Scala are e.g. the online books [[ https://www.safaribooksonline.com/library/view/learning-scala/9781449368814| Leanring Scala ]] by Jason Swartz and [[https://people.cs.ksu.edu/~schmidt/705a/Scala/Programming-in-Scala.pdf|Programming in Scala]] by Martin Odersky, Lex Spoon and Bill Venners. ===== The Interactive Spark Environment===== GWDG provides version 2.3.1 of the Apache Spark framework. On all frontend nodes for the cluster (gwdu101, gwdu102, gwdu103) the Spark environment is loaded by the module command: module load JAVA/jdk1.8.0_31 spark (Because spark depends on JAVA 8, the module ''JAVA/jdk1.8.0_31'' has to be loaded before the spark module.) Spark provides an interactive Spark Shell for executing single Spark commands or scripts with collections of Spark commands. This shell can be loaded locally on a frontend node by executing the command spark-shell which produces after a number of comment lines the prompt scala> Spark expressions entered in this shell will be evaluated by using as many cores of the node in parallel as needed and the result will be returned on the screen. ===== Introducing the Spark Shell ===== The interactive Spark Shell is an extension of the [[https://docs.scala-lang.org/overviews/repl/overview.html| **Scala REPL**]] (Read-Execute-Print-Loop) shell. Any input to the shell completed by the return key will be executed and will be followed by a printout of the expressions result, if the input was a valid Spark expression, or else by an error message. The available commands in this shell are listed by executing '':help'' scala> :help All commands can be abbreviated, e.g., :he instead of :help. :edit | edit historyreset :help [command] print this summary or command-specific help :history [num] show the history (optional num is commands to show) :h? search the history :imports [name name ...] show import history, identifying sources of names :implicits [-v] show the implicits in scope :javap disassemble a file or class name :line | place line(s) at the end of history :load interpret lines in a file :paste [-raw] [path] enter paste mode or paste a file :power enable power user mode :quit exit the interpreter :replay [options] reset the repl and replay all previous commands :require add a jar to the classpath :reset [options] reset the repl to its initial state, forgetting all session entries :save save replayable session to a file :sh run a shell command (result is implicitly => List[String]) :settings update compiler options, if possible; see reset :silent disable/enable automatic printing of results :type [-v] display the type of an expression without evaluating it :kind [-v] display the kind of expression's type :warnings show the suppressed warnings from the most recent line which had any To close the Spark shell, you press ''Ctrl+D'' or type in '':q'' (or any subset of '':quit'') Other commands will be desribed in turn when they will be used in the context of specific examples. Apache Spark is based on the object oriented functional programming language **Scala**. Basic elements of the language are ''classes'', which contain ''fields'' (values of specific types), and ''methods'' (functions applied to the values in the fields) and possibly input parameters. The syntax is class ([val|var] : [, ... ]) [{ fields and methods }] The following simple example shows how to define a class in the Spark Shell. The second line is the message from Spark Shell indicating the succesfull definition of a class: scala> class User (n: String = “none”) { val name = n; def greet = s"Hello from $name" } defined class User The class ''User'' is defined with a field ''name'' and a method ''greet'', and the field ''name'' has the default value ''"none"''. Instances of a class can be generated by expressions using the keyword ''new''. Appending the identifiers with ''.name'' gives the value of its field ''name'', with ''.greet'' results in applying its method ''greet'': scala> val userdefault = new User userdefault: User = User@22495f2f scala> userdefault.name res0: String = none scala> userdefault.greet res1: String = Hello from none scala> val userhaan = new User("haan") userhaan: User = User@2ded3code1cc scala> userhaan.name res2: String = haan scala> userhaan.greet res3: String = Hello from haan Every value defined by executing an expression in the Spark Shell is stored and available for later use, as are the results res[x] returned from executing some expression. The shell can be cleared by executing the command '': reset'', which removes all previous entries in the shell. The Spark Shell can execute commands from a script file using the command '':load ''. As an example, create a file ''User.script'' in the directory, from which the Spark Shell was opened: class User( n: String = "none") { val name = n def greet = s"Hello from $name" } val userdefault = new User userdefault.name userdefault.greet val userhaan = new User("haan") userhaan.name userhaan.greet Loading this script in the Spark Shell gives the following result: scala> :load User.script Loading User.script... defined class User userdefault: User = User@60a6564c res0: String = none res1: String = Hello from none userhaan: User = User@41cfcbb5 res2: String = haan res3: String = Hello from haan ===== Executing System Commands from the Spark Shell ===== The Spark Shell command '':sh'' allows to execute system commands and to display their results. E.g. the content of the directory from which the Spark Shell was started can be examined in the following way scala> :sh ls -l res0: scala.tools.nsc.interpreter.ProcessResult = `ls -l` (29 lines, exit 0) scala> res0.lines foreach println total 2709248 -rw------- 1 ohaan GGST 727 Jan 28 12:07 derby.log drwx------ 2 ohaan GGST 0 Aug 31 17:06 files ... scala> Unfortunately, wild cards or redirections are not handled by '':sh'', a command like '':sh ls core.*'' will result in an error. An alternative posibility for executing system commands is the use of Scala's ''!'' or ''!!'' methods (see e.g. [[https://alvinalexander.com/scala/scala-execute-exec-external-system-commands-in-scala|this link]]). These are available after importing the ''sys.process._'' package of Scala. The ''!'' methods executes the content of a string as a command, displays the result of the system command as output in the Spark Shell and returns the return value of the system command. The ''!!'' method returns the output from the system command in a string. As an example the execution of the ''ls -l'' command is shown: scala> import sys.process._ import sys.process._ scala> val content = "ls -l".! total 864 -rw------- 1 ohaan GGST 727 Jan 28 12:07 derby.log drwx------ 2 ohaan GGST 0 Aug 31 17:06 files ... content: Int = 0 scala> println(content) 0 scala> val content_string = "ls -l".!! content_string: String = "total 864 -rw------- 1 ohaan GGST 727 Jan 28 12:07 derby.log drwx------ 2 ohaan GGST 0 Aug 31 17:06 files ... scala> println(content_string) total 864 -rw------- 1 ohaan GGST 727 Jan 28 12:07 derby.log drwx------ 2 ohaan GGST 0 Aug 31 17:06 files ... For system shell commands using wildcards, the input string has to be evaluated by a system shell, which can be realized by applying the ''!'' and ''!!'' methods to the string list ''Seq("/bin/sh", "-c", "//command-string//")'' (see [[https://alvinalexander.com/scala/how-to-handle-wildcard-characters-running-external-commands|Scalacookbook, Recipe 12.17]]) : scala> Seq("/bin/sh","-c","ls *.script").! map1.script map.script nums.script pi.script st.script User.script while.script res68: Int = 0 ===== Creating Spark RDDs ===== The main entry point for Spark functionality is the class ''SparkContext''. (See [[https://spark.apache.org/docs/2.3.1/|documentation for Apache Spark 2.3.1]] following [[https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext|API Docs->Scala->org.apache.spark->SparkContext]]). This class represents the connection to a Spark cluster and it provides the methods to create RDDs, to process data within the partitions of a RDD and to communicate data between the different partitions of a RDD. Invoking the Spark Shell automatically generates the instance ''sc'' of the SparkContext class, which is used to access all of Sparks functionalities from the Spark Shell. There are three ways to create RDDs: * copying existing Scala collections * reading data from external storage * creating new RDDs from existing RDDs ==== Copying Collections ==== An existing Scala collection is transformed into an RDD with the method ''parallelize''. This method needs as the first argument the collection to be transformed into a RDD. As an optional second argument the number of partitions of the RDD to be created can be prescribed. If no second argument is provided, a system dependend choice for this number will be made. The following example in the Spark Shell shows how to generate a RDD with 3 partitioins containing the integers 1 to 10 : scala> val nums = 1 to 10 nums: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) scala> val nums_RDD = sc.parallelize(nums,3) nums_RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :26 The first command creates the Scala collection ''nums'' of type ''scala.collection.immutable.Range.Inclusive'', the second command uses the method ''parallelize'' for Spark Shell's instantiation ''sc'' of the class SparkContext to convert this collection into the RRD ''nums_RRD'' of type ''org.apache.spark.rdd.RDD[Int]''. Since the expression ''1 to 10'' in Scala is a class instance, it can be inserted directly into the expression for creating the RDD: scala> val nums_RDD = sc.parallelize(1 to 10,3) nums_RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at :24 ==== Reading Data ==== SparkSession provides methods to read data in several formats from storage into RDDs. The method ''textFile'' will read an asci text as a collection of lines into a RDD. For example the file ''lines.txt'' line1 : Hello line2 : World line3 : to line4 : everyone in the directory, from where the Spark Shell was started, will be loaded as follows: scala> val lines_RDD = sc.textFile("lines.txt") lines_RDD: org.apache.spark.rdd.RDD[String] = lines.txt MapPartitionsRDD[18] at textFile at :24 The argument in the method ''textFile'' can be the path to a file, as in the previous example, or a path to a directory. In this case, the created RDD will have as many elements as the total number of lines of all files in he directory, each element containing one line. Another method to create a RDD by reading text files is ''wholeTextFile'', with a directory path as argument. This method creates a RDD with as many elements as files in the directory, each element containing the full pathname for a file, and the string representation of the whole content of the file. ==== Modifying Existing RDDs ==== The RDD class allows the application of various methods, which result in the creation of new instances of RDD. Some of them will be described in more detail later on. An example is the method ''map(s => s.length)'', which replaces each string in the RDD by its length: scala> val linelength = lines_RDD.map(s => s.length) linelength: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at :25 Whereas ''lines_RDD'' is a RDD-collection of Strings - the lines of the input file ''lines.txt'' -, the newly created ''linelength'' is a RDD-collection of Ints - the number of characters in each line of the input file -. ===== Working with RDDs ===== Once a RDD is created, the methods defined for this class can be used to do work on its content in order to produce the required results. RDDs support two types of operations: transformations, which create one or more new RDDs from existing ones, and actions, which return values to the driver program after running a computation on the input RDD. All transformations in Spark are //lazy//, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently. Transformations are called //narrow// if they don't mix data from different partitions of the input RDD, otherwise they are called //wide//. Because the partitions of a RDD can be distributed over a cluster of computing resources, wide transformations and actions may involve data communication over the clusters network, whereas narrow transformations can be executed locally. A list of all methods available for RDDs is displayed in the Spark Shell by completing the identifyer for the RDD followed by a full-stop with the tab-key: scala> nums_RDD. [tab-key] ++ countByValue groupBy name sampleStdev toDF aggregate countByValueApprox histogram partitioner sampleVariance toDS cache dependencies id partitions saveAsObjectFile toDebugString canEqual distinct intersection persist saveAsTextFile toJavaRDD cartesian filter isCheckpointed pipe setName toLocalIterator checkpoint first isEmpty popStdev sortBy toString coalesce flatMap iterator popVariance sparkContext top collect fold keyBy preferredLocations stats treeAggregate collectAsync foreach localCheckpoint productArity stdev treeReduce compute foreachAsync map productElement subtract union context foreachPartition mapPartitions productIterator sum unpersist copy foreachPartitionAsync mapPartitionsWithIndex productPrefix sumApprox variance count getCheckpointFile max randomSplit take zip countApprox getNumPartitions mean reduce takeAsync zipPartitions countApproxDistinct getStorageLevel meanApprox repartition takeOrdered zipWithIndex countAsync glom min sample takeSample zipWithUniqueId A description for the methods can be found in the [[https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD | documentation for the Spark RDD class]] ==== Displaying Properties of the RDD ==== Methods for displaying the size and content of the RDDs : ^ method ^ description ^ |count() | returns the number of elements | |first() | returns the first element | |take(num) | returns an array with the first num elements| |takeSample(withReplacement, num, [seed]) |returns an array with a random sample of size num taken from the RDD, with or without replacement, optionally pre-specifying a random number generator seed.| |collect() | returns an array with all elements | Applying the method ''count'' to the previously defined RDDs returns their numbers of elements: scala> val numscount = nums_RDD.count() numscount: Long = 10 scala> val linescount = lines_RDD.count() linescount: Long = 4 The ''count'' method for RDDs is similar to the ''size'' method for Scala collections and different from the ''count'' method for Scala collections. The method ''size'' is not available for RDDs: scala> val numscount = nums_RDD.size() :25: error: value size is not a member of org.apache.spark.rdd.RDD[Int] val numscount = nums_RDD.size() ^ Selections of the content of a RDD can be accessed by the ''first'', ''take'', or ''takeSample'' methods: scala> val content = lines_RDD.first() content: String = line1 : Hello scala> val content = nums_RDD.take(5) content: Array[Int] = Array(1, 2, 3, 4, 5) The ''takeSample'' method randomly selects a number of elements of the RDD. The sampling is done with or without replacement of the chosen elements depending on the value of the first argument in the method beeing ''true'' or ''false''. With ''true'' the size of the sample can be arbitrary, even larger than the ''count'' of the RDD, and elements of the RDD can appear repeatedly, with ''false'' the size of the sample is limited to the count of the RDD and each element of the RDD can appear at most once. scala> nums_RDD.takeSample(false,4) res30: Array[Int] = Array(5, 10, 6, 8) scala> nums_RDD.takeSample(false,4) res31: Array[Int] = Array(8, 3, 2, 6) scala> nums_RDD.takeSample(true,14) res32: Array[Int] = Array(4, 6, 2, 8, 5, 5, 3, 3, 1, 1, 8, 9, 1, 6) scala> nums_RDD.takeSample(true,14) res33: Array[Int] = Array(9, 8, 3, 9, 5, 1, 10, 9, 10, 7, 2, 1, 3, 5) The ''collect'' method returns the full content ot the RDD, it gives the same result as calling the ''take'' method with argument ''//RDD_identifier//.count.toInt''. scala> val content = lines_RDD.collect() content: Array[String] = Array(line1 : Hello, line2 : World, line3 : to, line4 : everyone) scala> val content = nums_RDD.collect() content: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) scala> val content = nums_RDD.take(nums_RDD.count.toInt) content: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) The ''collect'' method tries to transfers the content of the whole RDD, which can be distributed over the main memories of several compute nodes to the main memory of the master node. For large RDDs this could lead to an out-of-memory exeption on the master node, therefore the ''collect'' method should not be used for large distributed RDDs. ==== Displaying Properties of RDD Partitions ==== Methods for displaying the number, sizes and contents of the partitions of the RDD are: ^ method ^ description ^ |getNumPartitions |returns the number of partitions | |mapPartitions(f) |returns a new RDD by applying a function f: Iterator ⇒ Iterator to each partition| |glom()| returns a new RDD created by coalescing all elements within each partition into an array | An example for getting the number of partitions is scala> val nums_RDD = sc.parallelize(1 to 50) nums_RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at parallelize at :24 scala> nums_RDD.collect res5: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50) scala> nums_RDD.getNumPartitions res0: Int = 24 which shows that the method ''parallelize'', invoked without the second parameter for prescibing the number of partitions created a RDD with 50 elements in 24 partitions. 24 is the number of cores avaible on the frontend gwdu103, which was used for these examples running Spark locally . It is the default for ''parallelize'' to split a collection into as many partitions as cores are available. The next example shows the result if the second parameter is set: scala> sc.parallelize(1 to 50,3).getNumPartitions res22: Int = 3 The sizes of the individual partitions can be obtained using the ''mapPartitions'' method, which will be described in more detail in the next section. This method allows to access the elements of every partition separately by an iterator. A function using these iterators to perform operations on the elements of the partitions and storing the result into an output iterator must be given as argument for this method. Using the sizes of the input iterators as output iterators then produces a RDD with 24 elements, each element containing the number of elements of the 24 partitions of the original RDD. Applying the same method to the new RDD produces a second RDD with 24 Elements containing the the value 1. scala> val part = nums_RDD.mapPartitions((x: Iterator[Int]) => Iterator(x.size)) part: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[29] at mapPartitions at :25 scala> part.collect res6: Array[Int] = Array(2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3) scala> val partpart = part.mapPartitions(x => Iterator(x.size)) partpart: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[30] at mapPartitions at :25 scala> partpart.collect res7: Array[Int] = Array(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1) The type assignment for the input iterator is redundant, because its type can be inferred from the type of elements in the RRD. This has been used in the expression for the value ''partpart''. The ''glom'' method allows to display the content of a RDD grouped by partitions. ''glom'' creates a RDD with as many partitions as partitions in the original RDD, each partition containing one element, the array of elements from the corresponding partition of the original RDD. This is shown applying ''glom'' to the RDD from the previuos example: scala> nums_RDD.glom() res2: org.apache.spark.rdd.RDD[Array[Int]] = MapPartitionsRDD[52] at glom at :26 scala> nums_RDD.glom().getNumPartitions res3: Int = 24 scala> nums_RDD.glom().collect() res4: Array[Array[Int]] = Array(Array(1, 2), Array(3, 4), Array(5, 6), Array(7, 8), Array(9, 10), Array(11, 12), Array(13, 14), Array(15, 16), Array(17, 18), Array(19, 20), Array(21, 22), Array(23, 24, 25), Array(26, 27), Array(28, 29), Array(30, 31), Array(32, 33), Array(34, 35), Array(36, 37), Array(38, 39), Array(40, 41), Array(42, 43), Array(44, 45), Array(46, 47), Array(48, 49, 50)) scala> sc.parallelize(1 to 50,3).glom.getNumPartitions res27: Int = 3 scala> sc.parallelize(1 to 50,3).glom.collect res28: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16), Array(17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33), Array(34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50)) ==== Saving RDDs to Disk ==== ^ method ^ description ^ |saveAsTextFile(path)| save the RDD as a text file, using string representations of elements.| This methods accepts a pathname to a directory, into which it saves the RDDs partitions in separate files, named ''part-00000'', ''part-00001'', etc. . This directory must not yet exist, it will be created by the method. The path may be absolut or relative to the working directory, from which the method is invoked. As an example, the directory ''files'' has three elements: gwdu103:35 17:55:00 ~/Spark_Intro > ls files file1 file2 file3 Using ''textFile'' creates a RDD, whose properties can be examined: val textfile_RDD = sc.textFile("files") textfile_RDD: org.apache.spark.rdd.RDD[String] = files MapPartitionsRDD[102] at textFile at :24 scala> textfile_RDD.count res67: Long = 12 scala> textfile_RDD.getNumPartitions res68: Int = 3 scala> textfile_RDD.glom().collect res69: Array[Array[String]] = Array(Array(b1, b2, b3, b4, b5), Array(a1, a2, a3, a4), Array(c1, c2, c3)) ''textfile_RDD.saveAsTextFile("newdir")'' creates the directory ''newdir'' with three files: gwdu103:35 18:07:29 ~/Spark_Intro > ls newdir part-00000 part-00001 part-00002 _SUCCESS gwdu103:35 18:08:28 ~/Spark_Intro > more newdir* *** newdir: directory *** gwdu103:35 18:08:57 ~/Spark_Intro > more newdir/* :::::::::::::: newdir/part-00000 :::::::::::::: b1 b2 b3 b4 b5 :::::::::::::: newdir/part-00001 :::::::::::::: a1 a2 a3 a4 :::::::::::::: newdir/part-00002 :::::::::::::: c1 c2 c3 :::::::::::::: newdir/_SUCCESS :::::::::::::: Using instead the method ''wholeTextFiles'': scala> val wholetextfiles_RDD = sc.wholeTextFiles("files") wholetextfiles_RDD: org.apache.spark.rdd.RDD[(String, String)] = files MapPartitionsRDD[111] at wholeTextFiles at :24 scala> wholetextfiles_RDD.count res77: Long = 3 scala> wholetextfiles_RDD.getNumPartitions res78: Int = 2 scala> wholetextfiles_RDD.glom().collect res79: Array[Array[(String, String)]] = Array(Array((file:/home/uni05/ohaan/Spark_Intro/files/file2,"b1 b2 b3 b4 b5 "), (file:/home/uni05/ohaan/Spark_Intro/files/file1,"a1 a2 a3 a4 ")), Array((file:/home/uni05/ohaan/Spark_Intro/files/file3,"c1 c2 c3 "))) Saving ''wholetextfiles_RDD'' with ''wholetextfiles_RDD.saveAsTextFile("newdir1")'' produces the directory ''newdir1'': gwdu103:35 18:09:11 ~/Spark_Intro > ls newdir1 part-00000 part-00001 _SUCCESS gwdu103:35 18:36:56 ~/Spark_Intro > more newdir1/* :::::::::::::: newdir1/part-00000 :::::::::::::: (file:/home/uni05/ohaan/Spark_Intro/files/file2,b1 b2 b3 b4 b5 ) (file:/home/uni05/ohaan/Spark_Intro/files/file1,a1 a2 a3 a4 ) :::::::::::::: newdir1/part-00001 :::::::::::::: (file:/home/uni05/ohaan/Spark_Intro/files/file3,c1 c2 c3 ) :::::::::::::: newdir1/_SUCCESS :::::::::::::: ==== More Transformations for RDDs ==== ^ method ^ description ^ |map(f) |returns a new RDD by applying a function f to each element| |flatMap(f) |returns a new RDD by first applying a function f to all elements, and then flattening the results| |mapPartitions(f) |returns a new RDD by applying a function f: Iterator ⇒ Iterator to each partition| |mapPartitionsWithIndex(f)|returns a new RDD by applying a function (Int, Iterator) ⇒ Iterator to each partition, while tracking the index of the original partition| | filter(f)|returns a new RDD containing only the elements that satisfy a predicate f: (T) => Boolean| === map === The ''map'' method applies a function to every element of a RDD and generates a new RDD containing the results of the function evaluation. The function must have a single input and the new RDD will have the same number of elements as the old one. In the following example, a collection of integers from 1 to 10 is distributed into a RDD collection with 3 partitions. A function ''double'' is defined which doubles its input and applied to the RDD with the map method: scala> val ints = sc.parallelize(1 to 10,3) ints: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at :27 scala> def double(x: Int): Int = 2*x double: (x: Int)Int scala> val ints_2 = ints.map(double) ints_2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[27] at map at :30 scala> ints.collect res32: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) scala> ints_2.collect res33: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20) scala> ints.getNumPartitions res34: Int = 3 scala> ints_2.getNumPartitions res35: Int = 3 The output in the Spark Shell displays the expected result of applying the ''map'' method with the ''double'' function and the fact, that the number of partitions is equal in the old and new RDD. The argument in the map method can be an identifier of a previously defined function as in the previous example. Also a function literal (anonymous function) can be used: scala> val ints_2=ints.map((x: Int) => 2*x) ints_2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at :25 scala> ints_2.collect res1: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20) The placeholder syntax of scala can be used for specifying the function literal: scala> val ints_2=ints.map(_*2) === flatMap === If the function, that transforms the elements of the RDD, returns a collection of several elements, the result of ''map'' will be a RDD containing as many collections as the number of elements in the original RDD: scala> ints.map(x => List(2*x-1,2*x)).collect res45: Array[List[Int]] = Array(List(1, 2), List(3, 4), List(5, 6), List(7, 8), List(9, 10), List(11, 12), List(13, 14), List(15, 16), List(17, 18), List(19, 20)) The ''flatMap'' method will "flatten" the RDD collection of generated collections into a RDD collection containing all elements of all generated collections: scala> ints.flatMap(x => List(2*x-1,2*x)).collect res46: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) Applying the ''glom'' method to the transformed RDD shows that the RDD generated with ''flatMap'' has the same number of partitions as the original RDD, each partition containing all elements generated from the elements of the corresponding partition in the original RDD. scala> ints.flatMap(x => List(2*x-1,2*x)).glom.collect res47: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6), Array(7, 8, 9, 10, 11, 12), Array(13, 14, 15, 16, 17, 18, 19, 20)) === mapPartitions, mapPartitionsWithIndex === The ''mapPartition'' method applies a function and returns the result separately to every partition of a RDD. The elements of the partition are fed into the function as an iterator and the result of the funtion has to be provided as an iterator, i.e. the function has to be defined as '' //fun//(x: Iterator[//Type//]): Iterator[//Type//] = . . .''. As an example here is an application of the ''mapPartition'' method, which returns a RDD containing the number of elements in the original RDD's partitions: scala> def ff(it: Iterator[Int]): Iterator[Int] = List(it.size).iterator ff: (it: Iterator[Int])Iterator[Int] scala> val ints = sc.parallelize(1 to 10,3) ints: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[63] at parallelize at :24 scala> ints.glom.collect res127: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10)) scala> ints.mapPartitions(ff).collect res126: Array[Int] = Array(3, 3, 4) The same result can be acchieved by giving the funtion argument for the mapPartitions method as a function literal scala> ints.mapPartitions(it => List(it.size).iterator).collect res129: Array[Int] = Array(3, 3, 4) With the method ''mapPartitionsWithIndex'', the transformation of the elements in a partition can be made dependend on the partition number, running from zero to number of partitions minus one. The function defining the transformation therefore has two input parameters, the partition index and the partition iterator. A function to be used as parameter in the mapPartitionsWithIndex method therefore has to be defined as '' //fun//(index: Int, x: Iterator[//Type//]): Iterator[//Type//] = . . .''. In the following example, to every element of a partition the value 100*(index+1) is added: scala> val ints = sc.parallelize(1 to 10,3) ints: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at :24 scala> ints.collect res5: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) scala> def fun(index: Int, it: Iterator[Int]): Iterator[Int] = it.toList.map(x =>x + 100*(index+1)).iterator fun: (index: Int, it: Iterator[Int])Iterator[Int] scala> ints.mapPartitionsWithIndex(fun).collect res6: Array[Int] = Array(101, 102, 103, 204, 205, 206, 307, 308, 309, 310) scala> ints.mapPartitionsWithIndex((index,it) => it.toList.map(x =>x + 100*(index+1)).iterator).collect res7: Array[Int] = Array(101, 102, 103, 204, 205, 206, 307, 308, 309, 310) In the example the function defining the transformatioon is first provided by its identifier and then as a function literal. === filter === With the ''filter'' method a new RDD is created containing all elements of the original RDD fulfilling a condition, which is specified in the function given as argument to the method. The function value must be of ''Boolean'' type, and all elements, for which the function returns ''true'', are collected in the new RDD. The new RDD has the same number of partitions as the original one, and the retained values in the new RDD are placed in the partition with the same number as the partition they had occupied in the original RDD. Here are two examples for the ''filter'' method, one selecting all even values, the other selecting all values smaller than 5: scala> val ints = sc.parallelize(1 to 10,3) ints: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at :24 scala> ints.glom.collect res16: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7, 8, 9, 10)) scala> def fun(a: Int): Boolean = (a%2 == 0) fun: (a: Int)Boolean scala> ints.filter(fun).glom.collect res17: Array[Array[Int]] = Array(Array(2), Array(4, 6), Array(8, 10)) scala> ints.filter(a => (a < 5)).glom.collect res18: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4), Array()) ==== Some Actions on RDDs ==== ^ method ^ description ^ |reduce(func) | Aggregate the elements of the dataset using a function func (which takes two arguments and returns one).\\ The function should be commutative and associative so that it can be computed correctly in parallel.| |collect() | Return all the elements of the dataset as an array at the driver program. | |count() | Return the number of elements in the dataset.| |first() | Return the first element of the dataset (similar to take(1)).| |take(n) | Return an array with the first n elements of the dataset.| === reduce === The ''reduce'' method produces a single result from all elements of a RDD. The result depends on the operation, which is defined in the function passed as paramter for the reduce method. For example, the function literal ''(x,y)=> x+y'' will produce the sum of all elements in the RDD, the function literal ''(x,y)=> x max y'' the largest element in the RDD. Of course all elements in the RDD should have the same type, and the method ''+'' resp. ''max'' should be applicable to this type. Here are examples using the ''reduce'' method for a collection of ''Int'' values: scala> val rdd1 = sc.parallelize(1 to 10,2) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[61] at parallelize at :24 scala> rdd1.glom.collect res146: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5), Array(6, 7, 8, 9, 10)) scala> rdd1.reduce((x,y)=>x+y) res147: Int = 55 scala> rdd1.reduce((x,y)=>x.max(y)) res148: Int = 10 scala> rdd1.reduce((x,y)=>x max y) res149: Int = 10 The last two commands express the same operation: The first shows, that ''max'' is a method applicable to objects of type ''Int'', the second is an example of Scalas syntax rule for methods with only one input. In the same way the addition of two values ''2'' and ''3'' is realized by the method ''+'' applicable to objects od type ''Int'' and therefore the result of the additions is returned form the statement ''2.+(3)'' . The above mentioned syntax rule allows to write this expression in the familiar form ''2 + 3''. The next example shows the action of the ''reduce'' method with the ''+'' operation for a collection of string values. Of course the ''+'' operator for string valued data is not communicative as is required for a valid operation for the reduce method. But this example is useful later on for examining the reduction process for RDDs with multiple partitions. scala> val chars = ('a' to 'j').toList.map(x => x.toString) chars: List[String] = List(a, b, c, d, e, f, g, h, i, j) scala> val rdd2 = sc.parallelize(chars,2) rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[65] at parallelize at :26 scala> rdd2.glom.collect res151: Array[Array[String]] = Array(Array(a, b, c, d, e), Array(f, g, h, i, j)) scala> rdd2.reduce((x,y)=>x+y) res152: String = abcdefghij In this example the collection of ''Char'' values has been transformed into a collection of ''String'' values, because Strings, but not Chars allow the application of the ''+'' method. The mechanisme working behind the reduce method can be shown by the following recursive Scala function reducing the elements of a List-collection: scala> def red[A](li: List[A],op: (A,A)=> A): A = { | val s = op(li(0),li(1)) | val lid = li.drop(2) | if (lid.size == 0) return s | red(s::lid,op) | } red: [A](li: List[A], op: (A, A) => A)A The function ''red'' receives as first input in square brackets a type value, which is to be used for input and output vlaues. Following in round brackets are two input values: the list object to be reduced and the function defining the operation to be employed for the reduction. In the function body, the first two elements of the list are used by the function to produce an result. A new list is created in two steps: first dropping the first two elemants using the ''drop(2)'' method and then prepending the result ''s'' of the operation using the ''::'' operator. Then the new list, which has one element less than the old one is used as input for calling recursively the function ''red''. The recursion loop is broken, when the length of the list is two and the operation of the function on the remaining two element is output as the end result for the reduction. This function now is used to reduce the List-collection of strings ''chars'' from the previous example: scala> red[String](chars,(x,y)=>x+y) res2: String = abcdefghij and produces the expected result. It will be interesting to follow the separate steps in the reduction process by including a print statement in the function literal: scala> red[String](chars,(x,y)=>{println(x,y);x+y}) (a,b) (ab,c) (abc,d) (abcd,e) (abcde,f) (abcdef,g) (abcdefg,h) (abcdefgh,i) (abcdefghi,j) res4: String = abcdefghij As expected, in each reduction step the next character is appended to the partial string result obtained so far. Now the reduction steps will be examined for RDD-collections, built with different numbers of partitions. With one partition, the reduction of the RDD-collection results from the same succession of steps as have been seen in the reduction of the List-collection: scala> val rdd_1p = sc.parallelize(chars,1) rdd_1p: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at :26 scala> red[String](chars,(x,y)=>{println(x,y);x+y}) (a,b) (ab,c) (abc,d) (abcd,e) (abcde,f) (abcdef,g) (abcdefg,h) (abcdefgh,i) (abcdefghi,j) res8: String = abcdefghij With 2 partitions, a different pattern for the reduction is observed: scala> val rdd_2p = sc.parallelize(chars,2) rdd_2p: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at :26 scala> rdd_2p.glom.collect res9: Array[Array[String]] = Array(Array(a, b, c, d, e), Array(f, g, h, i, j)) scala> rdd_2p.reduce((x,y)=>{println(x,y);x+y}) (a,b) (f,g) (ab,c) (fg,h) (fgh,i) (fghi,j) (abc,d) (abcd,e) (fghij,abcde) res10: String = fghijabcde The succession of reduction steps shows, that the characters in each partitions are concatenated separately, and only at the end the two partial strings from the two partitions are combined to the final complete string. The partial reduction in the partitions is done simultaneously, the resulting ordering in the final string therefore is nondeterministic, dependimg on the which partion is the first to finish its partial reduction. In order to obtain a unique result, the reduction operation must be commutative, a condition, wich is not fulfilled by the concatination of characters. The parallel processing of the partial reductions in the different partitions can again be observed with three partitions: scala> val rdd_3p = sc.parallelize(chars,3) rdd_3p: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at :26 scala> rdd_3p.glom.collect res4: Array[Array[String]] = Array(Array(a, b, c), Array(d, e, f), Array(g, h, i, j)) scala> rdd_3p.reduce((x,y)=>{println(x,y);x+y}) (a,b) (d,e) (g,h) (ab,c) (gh,i) (de,f) (ghi,j) (abc,ghij) (abcghij,def) res6: String = abcghijdef ===== Building and Submitting Spark Applications ===== The Spark Shell is appropriate for experimenting with Spark's features and for developing and testing small programs. Applications for production purposes will consist of program files, which have to be compiled and linked into an executable object, that can be submitted for execution on a cluster. The components involved in running a spark application are displayed schematically in the following picture: {{wiki:hpc:spark_components.png}} The ''client'' process submits the application, including specifications for the execution environment and eventually parameters for the application. A ''driver'' process is started, which handles the statements contained in the program file for the spark application. Every Spark application program has to specify a ''SparkContex'', in which the details for executing the appliction - e.g. type of cluster, number of executors, cores and memory of executors - are defined. According to these requirements the driver initiates one or more ''executor'' processes with the required number of cores to be run on the cluster. Of course the cluster to be used must provide enough computing resoures for the requirements of the SparkContext. The execution of the statements in the Spark application program then will be shared between driver and executors. Statements involving RDDs will be executed on the executors. An operation for a RDD is split into a number of tasks acccording to the number of partitions in the RDD, each task responsible for executing the operation on the data set in the respective partition. The driver process includes a ''scheduler'', which sends these tasks to the executors for computing the required operations on the data of the partition on one of the executor cores. The degree of parallelisme for the Spark application therefore is determined by the total number of cores of all executors, because every core can work in parallel on a different partition of a RDD. In addition to distribute the work to be done on the partitions of RDDs to the excutors, the driver collects and combines the results the executors produce for action operations on RDD partitions and processes all statements in the program, which do not involve RDDs. The next sections will describe: - defining the ''SparkContext'' in a scala program for a Spark application, - producing an executable jar-file from this program using the sbt (simple built tool) - running the application with ''spark-submit'' - running the application on a Spark standalone cluster - submitting Spark application job to the scheduling system SLURM for execution on GWDG's scientific compute cluster. ==== Setting the Spark Context ==== Establishing the connction to the ''SparkContext'' in a Scala programm for a Spark application is demontrated by the following code: import org.apache.spark.SparkContext object SparkConnect { def main(args: Array[String] = Array("")) { val sp = new SparkContext() println(sp.getConf.toDebugString) sp.stop() } } In the first line of the program the ''org.apache.spark.SparkContext'' package is imported to provide access to the SparkContext class. Then the object ''SparkConnect'' is created, containing a single method ''main''. In the ''main'' method the instance ''sp'' of the class ''SparkContext'' is created. This instance has access to the methods provided in the ''SparkContext'' class for generating and manipulating ''RDD'' collections. In this example, the instance of the SparkContext is generated without specifying values for its properties. The configuration of the context's properties will be taken from defaults defined for the execution environment to which the application is submitted. The printstatement gives the details of the context's configuration, which in this example uses the default values from the environment. The last statement deactivates the connection to the created SparkContext instance. A Spark application can be connected to only one single SparkContext. Trying to create a second connection will result in an error and terminate the application. Therefore the stop method has to be applied to an existing SparkContext instance, before a new one can be established. If it is not known, wether a connection to a SparkContext allready exists, the new connection should be established with the ''getOrCreate'' method for the SparkContext class: val sp = SparkContext.getOrCreate() If an instance of ''SparkContext'' already exists, this will be used for ''sp'', if not, ''sp'' will be a new instance of ''SparkContext''. From the file containing this program code an executabel ''jar'' file can be generated as will be explained in the next section. Invoking the executable with the ''spark-submit'' command will then call the ''main'' method defined for the object in the program file. Arguments for the main method and the settings for the configuration of the SparkContext can be passed with parameters and options, as will described in detail in a later section. The program, stored in a file ''SparkConnect.scala'' can be tested in the Spark Shell by loading it with the '':load'' command and then calling the main method for the object defined in the program file: scala> :load SparkConnect.scala Loading SparkConnect.scala... import org.apache.spark.SparkContext defined object SparkConnect scala> SparkConnect.main() org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). ... scala> sc.stop() scala> SparkConnect.main() spark.app.id=local-1550248586719 spark.app.name=Spark shell spark.driver.host=gwdu102.global.gwdg.cluster spark.driver.port=36803 spark.executor.id=driver spark.jars= spark.master=local[*] spark.submit.deployMode=client spark.ui.showConsoleProgress=true As was explained in previous sections, invoking the Spark Shell automatically provides a connection to a SparkContext with the value ''sc''. The creation of an additional instance of ''SparkConnect'' in the main method is not allowed and leads to an execption. Only after stopping the Spark shell instance ''sc'' the main method can create the new instance ''sp'' and print out the properties of its configuration. The new instance gets its properties from the Spark Shell environment. In particular the application name is set to ''Spark shell'' and the master is set to ''local[*]''. The properties of the SparkContext instance can be defined locally in the ''main'' definition by creating an instance of the class ''SparkConf'' with prescibed values for properties and creating the SparkContext with the value of the SparkConf instance as argument : import org.apache.spark.{SparkConf, SparkContext} import org.apache.log4j.{ Logger, Level} object SparkConnect { def main(args: Array[String] = Array("")) { Logger.getLogger("org").setLevel(Level.WARN) val co = new SparkConf(false).setMaster("local[2]").setAppName("SparkApp") val sp = new SparkContext(co) println(sp.getConf.toDebugString) sp.stop } } The argument ''false'' for the SparConf class prevents the setting of default properties from the environment, setting this argument to ''true'' or setting no argument leads to taking over the environments properties. The ''setMaster'' and ''setAppName'' methods then set the chosen custom values for these properties. The setting of master and appname is obligatory for a valid SparkContext, without setting these two properties the creation of the SparkContext's instance will fail. There is a large list of properties to be set, see the corresponding documentation for [[https://spark.apache.org/docs/latest/configuration.html|Spark Configuration]]. Only for a small set of properties a separate set-method is provided. The majority of properties has to be set with the method ''set("property-name","property-value")''. A further addition in this example is the setting of the log level to ''WARN'', which prevents the printing of a lot of ''INFO'' messages. ==== Building the Spark Application Executable with SBT==== There exist many different tools for compiling Scala code and build an executable jar-file. One of them is ''sbt'', which stands for "simple build tool" or "scala build tool". The sbt tool is not simple at all, as a look into the [[https://www.scala-sbt.org/1.x/docs/index.html|sbt reference manual]] shows. But for the Spark applications treated in these sections only a very restricted set of sbt features will be needed and explained in the following. In order to use the sbt tool on GWDG's cluster, the following module has to be loaded: module load scala/sbt The current working directory, from which ''sbt'' tasks are startet, is called the ''base directory'' in the sbt documentation. In the ''base directory'' at least one file with the suffix ''.sbt'' has to be provided, containing information concerning the tasks sbt shall execute. Usually the name ''build.sbt'' is chosen for this file, but sbt uses information from all files in the ''base directory'' with the suffix ''.sbt''. The following file ''build.sbt'' placed in the base directory will be the starting point for building a Spark application: /* build.sbt */ name := "SparkApp" scalaVersion := "2.11.8" logLevel := Level.Error libraryDependencies ++= Seq("org.apache.spark" %% "spark-core" % "2.3.1") The build file defines a ''name'' for the application, which will be a part of the name for the jar-file generated by sbt. Then the ''scalaVersion'' used for compiling the the program code is specified and finally the version ''2.3.1'' of the ''spark-core'' package is added to the library dependency, such that the import of Spark packages in the program code can be resolved. The Loglevel setting ''Level.Error'' prevents the printing of many ''WARN'' messages concerning version conflicts resulting from automatically included libraries. The executable jar file for a Spark application is built with the ''package'' task. sbt looks for source code files with suffix ''.scala'' in the base directory and in some default directories relative to the base directory, e.g. ''src/main/scala''. The use of sbt will be demonstrated with a source file ''SparkConnect.scala'', containing the code generating the ''SparkConnect'' from the previous section. An interactive sbt shell can be invoked in the base directory with the command ''sbt'' gwdu102:54 11:16:02 ~/SparkApp > sbt [info] Updated file /home/uni05/ohaan/SparkApp/project/build.properties: set sbt.version to 1.1.6 [info] Loading project definition from /home/uni05/ohaan/SparkApp/project [info] Loading settings from build.sbt ... [info] Set current project to SparkApp (in build file:/home/uni05/ohaan/SparkApp/) [info] sbt server started at local:///usr/users/ohaan/.sbt/1.0/server/bab34b8d3cad9a03bcdd/sock sbt:SparkApp> sbt:SparkApp> show sources [info] * /home/uni05/ohaan/SparkApp/SparkConnect.scala [success] Total time: 0 s, completed Feb 20, 2019 11:17:43 AM Invoking sbt for the first time generates a complex hierarchy of directories, which will only be described as far as relevant for the current purpose of generating a Spark application. The first ''[info]'' line states, that the newly generated directory ''projects'' has a file ''build.properties'' containing the used version of sbt, 1.1.6 in this example. The sbt-shell prompt ''sbt:SparkApp>'' takes the extension ''SparkApp'' from the ''name'' value set in ''build.sbt'' With the task ''show sources'' all files ending with ''.scala'' from the default source directories will be displayed. The follwing list shows some sbt tasks, wich can be executed in the sbt-shell. ^ tasks ^ description ^ |tasks | displays all tasks available in the sbt-shell| |compile | compiles the code from source files, generating class files | |package| produces binary jar files from the compiled sources ready for execution| |run | runs the main class from the executable jar file| |clean | deletes files produced by the sbt tasks, such as compiled classes and binary jar files| |reload| reloads the project in the base directory (after changes of settings in the .sbt file)| |exit | ends the sbt-shell| These tasks can also be executed as batch commands ''sbt '' issued from the base directory. The sbt-shell offers in addition the possibility of continuous commands by prepending the task name by ''~'', as e.g.''~compile''. In the continous command mode the shell executes the tasks repeatedly after changes in files, on which the task depends. The executable jar files for Spark applications are generated with the ''package'' task. sbt:SparkApp> package [success] Total time: 4 s, completed Feb 20, 2019 3:43:39 PM sbt:SparkApp> show package [info] /home/uni05/ohaan/SparkApp/target/scala-2.11/sparkapp_2.11-0.1.0-SNAPSHOT.jar [success] Total time: 0 s, completed Feb 20, 2019 3:43:43 PM The second command ''show package'' returns the location ''/home/uni05/ohaan/SparkApp/target/scala-2.11/'' and name ''sparkapp_2.11-0.1.0-SNAPSHOT.jar'' of the generated jar file. This name includes the project property settings ''name'', ''version'' and ''scalaVersion'' from the build.sbt file. Since no ''version'' value for the project was set in the example, the default value ''0.1.0-SNAPSHOT'' was chosen. The executable jar file can be submitted by the ''run'' task from sbt: sbt:SparkApp> run Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 19/02/20 16:03:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable spark.app.id=local-1550675006775 spark.app.name=SparkApp spark.driver.host=gwdu102.global.gwdg.cluster spark.driver.port=39956 spark.executor.id=driver spark.master=local[2] [success] Total time: 3 s, completed Feb 20, 2019 4:03:27 PM ==== Running a Spark Application with spark-submit ==== The ''spark-submit'' command starts the client process for a Spark Application and allows to set at run time configuration properties for the SparkContext. spark-submit \ --master \ --conf = \ ... # other options \ Some properties can be set using a reserved option name, for example the ''%%--master%%'' option determines the compute environment to be used for the Spark application. Some possible values for the master-url are ^ Master %%URL%% ^ Meaning ^ |local | Run Spark locally with one worker thread (i.e. no parallelism at all)| |local[K] | Run Spark locally with K worker threads | |local[*] | Run Spark locally with as many worker threads as logical cores on your machine| |spark:%%//%%HOST:PORT | Connect to the given Spark standalone cluster master | The complete list of possible options with their default settings can be inspected by calling ''spark-submit %%--help%%'' ''spark-submit'' may be invoked without any option, only the absolute or relative path to the jar file containing the executable spark Application must be provided. Then the configuration properties will be set to their default values. A Spark Application creating a SparkContext without specifying any properties as in import org.apache.spark.SparkContext object SparkConnect { def main(args: Array[String] = Array("")) { val sp = new SparkContext() println(sp.getConf.toDebugString) sp.stop() } } with a corresponding application-jar file ''target/scala-2.11/sparkapp_2.11-0.1.0-SNAPSHOT.jar'' produced by the ''sbt package'' command as described in the previous section, will produce the following output by submitting it without explicit settings for the options ~/SparkApp > $SPARK_HOME/bin/spark-submit target/scala-2.11/sparkapp_2.11-0.1.0-SNAPSHOT.jar ... spark.app.id=local-1557919036370 spark.app.name=SparkConnect spark.driver.host=gwdu102.global.gwdg.cluster spark.driver.port=41872 spark.executor.id=driver spark.jars=file:/home/uni05/ohaan/SparkApp/target/scala-2.11/sparkapp_2.11-0.1.0-SNAPSHOT.jar spark.master=local[*] spark.submit.deployMode=client ==== Ways to Configure the SparkContext ==== The properties, which can be configured for the SparkContext are listed in the section [[https://spark.apache.org/docs/latest/configuration.html|Spark Configuration]] of the Spark documantation. Here is a short collection of properties from this list ^ property name^default^description^ |spark.master |(none) |the cluster manager to connect to | |spark.cores.max| (not set) |the maximum amount of CPU cores to request for the application from across the cluster | |spark.executor.cores |all the available cores on the worker in standalone mode |the number of cores to use on each executor | |spark.default.parallelism| for distributed shuffle operations the largest number of partitions in a parent RDD; for operations like parallelize with no parent RDDs total number of cores on all executor nodes|default number of partitions in RDDs | There are three different ways to set a configuration property for a SparkContext, which are in order od precedence: With the method ''set'' for the ''SparkConf'' object in the application program SparkConf.set("","") with the option ''%%--%%conf'' in the ''spark-submit'' command --conf = with a line in a the spark configuration file with the name ''spark-defaults.conf'' in the directory specified in the environmment variable ''SPARK_CONF_DIR'' or with a name specified in the option ''%%--%%properties-file'' in the ''spark-submit'' command Some frequently used properties can be set with a simplified syntax. Examples are the method ''setMaster("")'' for the ''SparkConfig'' class or the option ''%%--%%master '' for the ''spark-submit'' command. ==== Setting up a Spark Standalone Cluster ==== Starting an application with ''spark-submit'' without specifying the ''spark.master'' property for the ''SparkContext'' or with the value ''local[*]'' for the option ''%%--%%master'' will use all the cores of the local node for executing the application. Multiple nodes for a distributed execution of a Spark application can be used on a Spark standalone cluster. The standalone mode consists of a master and one or more workers. The Spark installation provides in the directory ''$SPARK_HOME/sbin'' the following scripts for managing the standolone mode: ^ sript name ^ purpose ^ |start-master.sh | Starts a master instance on the machine the script is executed on| |stop-master.sh | Stops the master that was started via the start-master.sh script| |start-slave.sh | Starts a worker instance on the machine the script is executed on| |stop-slave.sh | Stops the worker that was started via the start-slave.sh script| |start-slaves.sh | Starts worker instances on all nodes listed in the file slaves| |stop-slave.sh | Stops the workers that were started via the start-slaves.sh script| |start-all.sh | Combines starting master and workers| |stop-all.sh | Combines stopping master and workers| == Starting the Master == The following lines show the steps for starting a master process for a Spark standalone cluster gwdu102 > module load JAVA/jdk1.8.0_31 spark gwdu102 > export SPARK_LOG_DIR=~/SparkLog gwdu102 > ${SPARK_HOME}/sbin/start-master.sh starting org.apache.spark.deploy.master.Master, logging to /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.master.Master-1-gwdu102.out gwdu102 > grep Starting /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.master.Master-1-gwdu102.out 2019-07-22 15:45:52 INFO Master:54 - Starting Spark master at spark://gwdu102.global.gwdg.cluster:7077 gwdu102 > jps 31663 Jps 29761 Master Before starting the master by executing ''$SPARK_HOME/sbin/start-master.sh'', the environment for JAVA and Spark must be initialized by loading the appropriate modules. Furthermore the environment variable ''SPARK_LOG_DIR'' has to be set to the name of a log directory, which will be created if it does not yet exist. The invocation of ''start-master.sh'' produces a message, which includes the name of the log file in this directory. Among many other messages printed in this log file is a line with the master-url ''spark:%%//%%gwdu102.global.gwdg.cluster:7077'', which includes the name of the host, on which the master has been started and the number of the port, on which the master listens. If not specified with the option ''%%--%%port '', the port number is 7077 by default. The workers in the Spark stanalone cluster will use this master-url for the connection to the master, and the ''spark-submit'' command will need the master-url to start the Spark application in the standolane mode. With the command ''jps'' the running of the master process in a java virtual machine can be verified. == Starting a Worker == Now a worker process can be started as follows: gwdu102 > export SPARK_WORKER_DIR=~/SparkWorker gwdu102 > ${SPARK_HOME}/sbin/start-slave.sh spark://gwdu102:7077 starting org.apache.spark.deploy.worker.Worker, logging to /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.worker.Worker-1-gwdu102.out gwdu102 > grep Starting /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.worker.Worker-1-gwdu102.out 2019-07-22 15:47:45 INFO Worker:54 - Starting Spark worker 10.108.96.102:46672 with 16 cores, 61.9 GB RAM gwdu102 > grep registered /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.worker.Worker-1-gwdu102.out 2019-07-22 15:47:45 INFO Worker:54 - Successfully registered with master spark://gwdu102.global.gwdg.cluster:7077 gwdu102 > grep Registering /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.master.Master-1-gwdu102.out 2019-07-22 15:47:45 INFO Master:54 - Registering worker 10.108.96.102:46672 with 16 cores, 61.9 GB RAM gwdu102 > jps 18861 Jps 3081 Worker 29761 Master For starting a worker calling ''${SPARK_HOME}/sbin/start-slave.sh'' with the master-url as an obligatory argument, the name of a worker-directory has to be supplied, into which output from running applications on the worker will be stored. This name can be provided by setting the environment variable ''SPARK_WORKER_DIR''. Alternatively this name can be set in the option ''%%--%%work-dir'' following the master-url argument in the calling sequence for starting the worker. This directory will be created if it does not yet exist. . The message from starting the worker shows, that a log file is produced in the directory indicated by the environment variable ''SPAR_LOG_DIR'', which was set before starting the master. This logfile contains a line stating the start of the worker process , the number of cores and the amount of memory allocated to the worker. A further line announces the registering of this worker with the master. The same event is listed in the master log file as well. As can be seen from the ''jps'' command, master and worker processes are now running on the same host. By default the number of available cores and all available memory of a node is allocated to the worker startet with the ''start-slave.sh'' script. Different amount of hardware ressources for the worker can be specified by setting the environment variables ''SPARK_WORKER_CORES'' to the number of required cores and ''SPARK_WORKER_MEMORY'' to the size of required memory (e.g. to 1000M or 2G). Instead of setting the environment variables, these ressources can be required by setting the options ''%%--%%core'' and ''%%--%%memory'' in the calling sequence for starting the worker. == Starting a Worker on a Remote Node == The standalone cluster can include workers running on different nodes by starting the ''start-slave.sh'' script on different nodes. gwdu102 > ssh gwdu103 Last login: Wed Jun 26 11:36:28 2019 from gwdu102.global.gwdg.cluster gwdu103 > module load JAVA/jdk1.8.0_31 spark gwdu103 > export SPARK_LOG_DIR=~/SparkLog gwdu103 > ${SPARK_HOME}/sbin/start-slave.sh spark://gwdu102:7077 --cores 4 --work-dir ~/SparkWorker starting org.apache.spark.deploy.worker.Worker, logging to /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.worker.Worker-1-gwdu103.out gwdu103 > jps 20784 Worker 21588 Jps gwdu103 > exit logout Connection to gwdu103 closed. gwdu102 > grep Registering /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.master.Master-1-gwdu102.out 2019-07-22 15:47:45 INFO Master:54 - Registering worker 10.108.96.102:46672 with 16 cores, 61.9 GB RAM 2019-07-22 16:28:08 INFO Master:54 - Registering worker 10.108.96.103:39968 with 4 cores, 61.8 GB RAM Of course after login with ''ssh'' from node ''gwdu102'' to the node ''gwdu103'' the modules for using Spark have to be loaded and the variable ''SPARK_LOG_DIR'' has to be set before the start of a worker on this node is possible. In this example worker directory and number of worker cores are specified by setting values for the options ''%%--%%work-dir'' and ''%%--%%core''. All activities for setting up the standalone cluster are documented in the logfile ''~/SparkLog/spark-ohaan-org.apache.spark.deploy.master.Master-1-gwdu102.out''. The two lines displayed with the ''grep'' command show, that now two workers on two different nodes are available for the application. == Starting Multiple Workers with ''start-slaves.sh'' == The script ''sbin/start-slaves.sh'' will start multiple workers on different nodes using remote commands via ''ssh''. The node names must be provided as separate lines in a file ''slaves''. This file has to be created in the directory specified in the environment variable ''SPARK_CONF_DIR''. The default value for this variable is ''$SPARK_HOME/conf''. Because this default is a read only directory in the Spark installation on GWDG's compute cluster, ''SPARK_CONF_DIR'' has to be explicitely set to a name of a local directory with permission for creating files. The Spark standalone scripts inspect this directory also for the existence of a file ''spark-env.sh'', from which values for environment variables can be set, e.g. ''SPARK_LOG_DIR'' and ''SPARK_WORKER_DIR'' for the directories needed for the start of master and worker, and ''SPARK_WORKER_CORES'' and ''SPARK_WORKER_MEMORY'' for the amount of hardware ressources to be allocated the workers. The environment created in the local shell from which the script ''sbin/start-slaves.sh'' is startet will not be passed to the bash shell created on the remote nodes by the ''ssh'' commands. But in the remote shell the file ''~/.bashrsc'' is sourced before the required commands are executed. Putting ''export'' statements for the necessary environment variables into the file ''~/.bashrsc'' on the remote node therefore will provide the environment needed to execute the standalone scripts on the remote nodes. Since the file system in GWDG's cluster is global, the ''~/.bashrsc'', set up in the local shell, will be available on all nodes. The following screen shots show how to set up the standalone cluster with the ''sbin/start-slaves.sh'' script: Setting environment variables ''SPARK_HOME'', ''JAVA_HOME'' by loading the modules for Spark and Java: gwdu102 > module load JAVA spark gwdu102 > echo $JAVA_HOME /usr/product/bioinfo/JAVA/jdk1.8.0_31/jre gwdu102 > echo $SPARK_HOME /usr/product/parallel/spark/2.3.1/spark-2.3.1-bin-hadoop2.7 Setting the environment variable ''SPARK_CONF_DIR'' and creating the directory to which it points: gwdu102 > export SPARK_CONF_DIR=~/SparkConf gwdu102 > mkdir -p $SPARK_CONF_DIR Generating a file ''spark-env.sh'' which sets the environment variables ''SPARK_LOG_DIR'' and ''SPARK_WORKER_DIR'' and, optionally additional varables, e.g. ''SPARK_WORKER_CORES'' and ''SPARK_WORKER_MEMORY'': gwdu102 > echo "export SPARK_LOG_DIR=~/SparkLog" > $SPARK_CONF_DIR/spark-env.sh gwdu102 > echo "export SPARK_WORKER_DIR=~/SparkWorker" >> $SPARK_CONF_DIR/spark-env.sh gwdu102 > echo "export SPARK_WORKER_CORES=4" >> $SPARK_CONF_DIR/spark-env.sh gwdu102 > echo "export SPARK_WORKER_MEMORY=4048M" >> $SPARK_CONF_DIR/spark-env.sh gwdu102 > cat $SPARK_CONF_DIR/spark-env.sh export SPARK_LOG_DIR=~/SparkLog export SPARK_WORKER_DIR=~/SparkWorker export SPARK_WORKER_CORES=4 export SPARK_WORKER_MEMORY=4048M Generating a file ''slaves'' containing the names of the nodes on which a worker shall be started: gwdu102 > echo gwdu102 > $SPARK_CONF_DIR/slaves gwdu102 > echo gwdu101 >> $SPARK_CONF_DIR/slaves gwdu102 > echo gwdu103 >> $SPARK_CONF_DIR/slaves gwdu102 > cat $SPARK_CONF_DIR/slaves gwdu102 gwdu101 gwdu103 Generating the file ''~/.bashrc'' to set the environment for the shell started by the ''ssh'' commands on the remote nodes: gwdu102 > echo "export SPARK_HOME=$SPARK_HOME" > ~/.bashrc gwdu102 > echo "export JAVA_HOME=$JAVA_HOME" >> ~/.bashrc gwdu102 > echo "export SPARK_CONF_DIR=$SPARK_CONF_DIR" >> ~/.bashrc gwdu102 > cat ~/.bashrc export SPARK_HOME=/usr/product/parallel/spark/2.3.1/spark-2.3.1-bin-hadoop2.7 export JAVA_HOME=/usr/product/bioinfo/JAVA/jdk1.8.0_31/jre export SPARK_CONF_DIR=/usr/users/ohaan/SparkConf Now the scripts for starting master and worker can be run: gwdu102 > $SPARK_HOME/sbin/start-master.sh starting org.apache.spark.deploy.master.Master, logging to /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.master.Master-1-gwdu102.out gwdu102 > $SPARK_HOME/sbin/start-slaves.sh gwdu103: starting org.apache.spark.deploy.worker.Worker, logging to /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.worker.Worker-1-gwdu103.out gwdu102: starting org.apache.spark.deploy.worker.Worker, logging to /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.worker.Worker-1-gwdu102.out gwdu101: starting org.apache.spark.deploy.worker.Worker, logging to /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.worker.Worker-1-gwdu101.out The last lines of the logfile for the master report the successfull starts of the three workers: gwdu102 > more /usr/users/ohaan/SparkLog/spark-ohaan-org.apache.spark.deploy.master.Master-1-gwdu102.out ... 2019-07-23 15:44:19 INFO Master:54 - Registering worker 10.108.96.102:38162 with 4 cores, 4.0 GB RAM 2019-07-23 15:44:21 INFO Master:54 - Registering worker 10.108.96.103:46111 with 4 cores, 4.0 GB RAM 2019-07-23 15:44:21 INFO Master:54 - Registering worker 10.108.96.101:40362 with 4 cores, 4.0 GB RAM These lines also show, that the workers provide the number of cores and amount of local memory specified in the environment variables ''SPARK_WORKER_CORES'' and ''SPARK_WORKER_MEMORY'' via the ''.bashrc'' file. The standalone cluster can be started in one step by running the command ''sbin/start-all.sh'' which combines ''sbin/start-master.sh'' and ''start-slaves.sh''. The workers are stopped by ''sbin/stop-slaves.sh'', the master ist stopped by ''sbin/stop-master'', master and workers are stopped in one step by ''sbin/stop-all.sh''. ==== Submitting to the Spark Standalone Cluster ==== The Spark standalone cluster provides cores and memory on different worker nodes. The Spark application uses the resources from the standalone cluster in a way specified by configuration properties for the SparkContext. These properties must include the master_url, furthermore number of cores and amount of memory for the executors can be prescribed and the default number of partitions in a RDD collection can be fixed. Different ways to set SparkContext properties programmatically or at execution time have been described in the section ''Ways to Configure the SparkContext''. The following script ''setSparkcConf.sh'' sets the property ''spark.default.parallelism'' to the total number of cores of all worker nodes and choses for spark.executor.cores and spark.executor.memory fixed values by generating the corresponding configuration file ''$SPARK_CONF_DIR/spark-defaults.conf''. nodelist=`cat $SPARK_CONF_DIR/slaves` total_cores=0 for host in $nodelist do cores=$(ssh $host '. $SPARK_CONF_DIR/spark-env.sh;echo $SPARK_WORKER_CORES') total_cores=$(( $total_cores + $cores )) done echo "spark.default.parallelism" $total_cores > $SPARK_CONF_DIR/spark-defaults.conf echo "spark.submit.deployMode" client >> $SPARK_CONF_DIR/spark-defaults.conf echo "spark.master" spark://`hostname`:7077 >> $SPARK_CONF_DIR/spark-defaults.conf echo "spark.executor.cores" 2 >> $SPARK_CONF_DIR/spark-defaults.conf echo "spark.executor.memory" 1024M >> $SPARK_CONF_DIR/spark-defaults.conf A toy application, showing the distribution of partitions onto executors, is generated by the following scala program: /* SparkApp.scala */ import org.apache.spark._ import org.apache.spark.{SparkConf, SparkContext} import org.apache.log4j.{ Logger, Level} import sys.process._ object SparkApp { def work(i: Int): (String,String,String) = { val str1 = "date +%T".!! val time = str1.take(str1.length-1) val str2 = "hostname".!! val host = str2.take(str2.length-1) val id = SparkEnv.get.executorId "sleep 10".! return (host,id,time) } def main(args: Array[String] = Array("")) { Logger.getLogger("org").setLevel(Level.WARN) val sp = new SparkContext() println(sp.getConf.toDebugString) val st = System.nanoTime val rdd = sp.parallelize(1 to sp.defaultParallelism).cache println("\nInformation from partitions:") println("host,executor_id,start_time") rdd.map(x=>work(x)).collect.foreach(x=>println(x+" ")) val dt = (System.nanoTime - st)/1e9d println("\n total time for application: "+dt) sp.stop } } This program defines a function ''work'', which simulates some work by calling ''sleep 10'' and which returns three strings: the hostname on which the function was executed, the id of the executor executing the function and the time of starting the function execution. The main program, after printing the configuration of the SparkContext, generates a RRD with a number of elements equal to the ''defaultParallelism'' value of the SparkContext, which is split into the same number of partitions, such that every partition of the RDD cantains just one element. Then the result from calling the map method to execute the ''work'' function for every element of the RDD is printed. From registering the system time before and after the execution of the RDD operations, the total execution time is calcluated. As described in the section ''Building the Spark Application Executable with SBT'' an executable jar file ''~/SparkApp/target/scala-2.11/sparkapp_2.11-0.1.0-SNAPSHOT.jar'' can be generated, which in turn can be executed with the spark-submit command. For a Spark standalone cluster running three workers with two cores, and setting the number of cores for the executors to two, the following output results: gwdu102 > spark-submit ~/SparkApp/target/scala-2.11/sparkapp_2.11-0.1.0-SNAPSHOT.jar 2019-07-29 17:43:42 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable spark.app.id=app-20190729174343-0003 spark.app.name=SparkApp spark.default.parallelism=6 spark.driver.host=gwdu102.global.gwdg.cluster spark.driver.port=46197 spark.executor.cores=2 spark.executor.id=driver spark.executor.memory=1024M spark.jars=file:/usr/users/ohaan/SparkApp/target/scala-2.11/sparkapp_2.11-0.1.0-SNAPSHOT.jar spark.master=spark://gwdu102:7077 spark.submit.deployMode=client Information from partitions: host,executor_id,start_time (gwdu102,2,17:43:46) (gwdu102,2,17:43:46) (gwdu101,1,17:43:47) (gwdu101,1,17:43:47) (gwdu103,0,17:43:48) (gwdu103,0,17:43:48) total time for application: 14.401789086 The SparkContext configuration has the expected properties for the defaultParallelism (equal to 6, the total number of cores in the standalone cluster) and for the executor cores (equal to 2). The RDD collection was created with 6 elements, spread into 6 partition, so every partion contains just 1 element. Every partition of the RDD is send by the driver to an executor for performing the action on the partitions elements as defined in the function given as argument to the map method. The output shows, that three executors have been working (executor ids 0, 1, 2), each executor on a different host, and every executor acting on 2 partitions. Since the total core number for running the application is 6, the computation for every partition is scheduled to a different core and all 6 computations can start nearly simultaneously, delayed only by the time needed for the scheduling itself. Comparing the total time for the application of 14.4 sec to the time of 10 sec used in a single work function shows, that this overhead is aprox. 4 sec. ==== Running Spark Applications on GWDG's Scientific Compute Cluster ==== Jobs on GWDG's compute cluster are managed by the SLURM workload manager as described in the section [[https://info.gwdg.de/dokuwiki/doku.php?id=en:services:application_services:high_performance_computing:running_jobs_slurm|Running Jobs with SLURM]] of GWDG's HPC documentation. A Spark standalone cluster can be started on nodes of the compute cluster by submitting the SLURM ''sbatch'' command with a script specifying the required resources, followed by the commands for setting up the standalone cluster as described in the previous two sections. The resources to be allocated by the SLURM system should include as many ''slurm tasks'' as ''spark executors'' needed for the spark application, and each task should provide the number of cores and the amount of memory that an executor needs for the spark application. Therefore the sbatch script must include the options ''%%--%%ntasks'', ''%%--%%cpus-per-task'' and ''%%--%%mem-per-cpu'', whereas an option ''%%--%%nodes'' is not neeeded, because the SLURM manager can decide on the number of nodes for providing the resources for the required number of tasks. An example is the following ''start-standalone.script'' : #!/bin/bash #SBATCH --partition=medium #SBATCH --ntasks=4 #SBATCH --cpus-per-task=2 #SBATCH --mem-per-cpu=2048 #SBATCH --time=60:00 #SBATCH --output=outfile-%J . setenv.sh $SPARK_HOME/sbin/start-all.sh . wait-worker.sh sleep infinity Submitting this script from the login host with the command ''sbatch start-standalone.script'' will start a Spark standalone cluster, which will be alive for the time required by the option ''%%--%%time=60:00'', or until the command ''scancel '' is launched, where '''' is the job-id returned from the sbatch command. The commands in the batch script start with sourcing the script ''setenv.sh'' to prepare the environment for starting the standalone cluster: ##setenv.sh## module load JAVA/jdk1.8.0_31 spark export SPARK_CONF_DIR=~/SparkConf mkdir -p $SPARK_CONF_DIR env=$SPARK_CONF_DIR/spark-env.sh echo "export SPARK_LOG_DIR=~/SparkLog" > $env echo "export SPARK_WORKER_DIR=~/SparkWorker" >> $env echo "export SLURM_MEM_PER_CPU=$SLURM_MEM_PER_CPU" >> $env echo 'export SPARK_WORKER_CORES=`nproc`' >> $env echo 'export SPARK_WORKER_MEMORY=$(( $SPARK_WORKER_CORES*$SLURM_MEM_PER_CPU ))M' >> $env echo "export SPARK_HOME=$SPARK_HOME" > ~/.bashrc echo "export JAVA_HOME=$JAVA_HOME" >> ~/.bashrc echo "export SPARK_CONF_DIR=$SPARK_CONF_DIR" >> ~/.bashrc scontrol show hostname $SLURM_JOB_NODELIST > $SPARK_CONF_DIR/slaves conf=$SPARK_CONF_DIR/spark-defaults.conf echo "spark.default.parallelism" $(( $SLURM_CPUS_PER_TASK * $SLURM_NTASKS ))> $conf echo "spark.submit.deployMode" client >> $conf echo "spark.master" spark://`hostname`:7077 >> $conf echo "spark.executor.cores" $SLURM_CPUS_PER_TASK >> $conf echo "spark.executor.memory" $(( $SLURM_CPUS_PER_TASK*$SLURM_MEM_PER_CPU ))M >> $conf The role of the commands in this file have been explained in detail in the two previous sections. The number of cores for a Spark standalone worker is determined dynamically by the command ''`nproc`'', which evaluates on each node to the number of cores allocated by the SLURM manager on the node. This number is equal to the number of tasks allocated to the node times the value specified in the SBATCH option ''%%--%%cpus-per-task''. The amount of memory for a worker is determined by the product of this number of cores times the memory setting in the SBATCH option ''%%--%%mem-per-cpu'', which is available as the environment variable ''SLURM_MEM_PER_CPU''. The nodes on which workers are to be started are listed in a special format in the environment variable ''SLURM_JOB_NODELIST'', which is expanded to the node names with the ''scontrol'' command, the result of which is written to the ''slaves'' file. In the last group of commands the properties for the SparkContext are set, which are needed for running the Spark application on the started standalone cluster. In particular, the ''spark.default.parallelism'' is set to the product of allocated SLURM tasks times the number of allocated cpus per tasks, i.e. to the total number of allocated cores. The property ''spark.executor.cores'' sets the numer of cores for every executor to the number of cpus allocated to every task in ''$SLURM_CPUS_PER_TASK'', and ''spark.executor.memory'' sets the amount of memory for every executor to the amount of memory allocated the a SLURM task. After preparing the environment for the standalone cluster, the next command ''$SPARK_HOME/sbin/start-all.sh'' will start the cluster as described in the section ''Setting up a Spark Standalone Cluster''. The ''start-all.sh'' command returns after starting the master und worker processes. The standalone cluster will not be ready for executing the Spark application, until a connection between master and worker has been established and the workers processes have been registerd with the master process. Sourcing the script ''wait-workers.sh'' will wait until the registering of all workers has succeeded. This script looks repeatedly into the logfiles for the started workers and returns, if the line with the text ''Successfully registered with master'' has been included in the logfiles for all started workers: ##wait-worker.sh## . $SPARK_CONF_DIR/spark-env.sh num_workers=`cat $SPARK_CONF_DIR/slaves|wc -l` echo number of workers to be registered: $num_workers master_logfile=`ls -tr ${SPARK_LOG_DIR}/*master* |tail -1` worker_logfiles=`ls -tr ${SPARK_LOG_DIR}/*worker* |tail -$num_workers` steptime=3 for i in {1..100} do sleep $steptime num_reg=` grep 'registered' $worker_logfiles|wc -l` if [ $num_reg -eq $num_workers ] then break fi done echo registered workers after $((i * steptime)) seconds : for file in $worker_logfiles do grep 'registered' $file done grep 'Starting Spark master' $master_logfile grep 'Registering worker' $master_logfile|tail -$num_workers The output of the ''wait-workers.sh'' script will be written to the output file specified by the ''%%--%%output'' option in the sbatch script ''start-standalone.script''. Only after finding lines like number of workers to be registered: 3 registered workers after 114 seconds : 19/08/11 12:26:25 INFO Worker: Successfully registered with master spark://gwdd034.global.gwdg.cluster:7077 19/08/11 12:27:07 INFO Worker: Successfully registered with master spark://gwdd034.global.gwdg.cluster:7077 19/08/11 12:26:29 INFO Worker: Successfully registered with master spark://gwdd034.global.gwdg.cluster:7077 19/08/11 12:26:24 INFO Master: Starting Spark master at spark://gwdd034.global.gwdg.cluster:7077 19/08/11 12:26:25 INFO Master: Registering worker 10.108.102.34:43775 with 4 cores, 8.0 GB RAM 19/08/11 12:26:29 INFO Master: Registering worker 10.108.102.46:35564 with 2 cores, 4.0 GB RAM 19/08/11 12:27:07 INFO Master: Registering worker 10.108.102.47:37907 with 2 cores, 4.0 GB RAM in the batch output file, the Spark cluster is ready for executing Spark applications. Now Spark applications can be submitted from the same login node, from which the sbatch script had been submitted. All necessary setting of the SparkContext properties have been placed into the ''spark-defauls.conf'' file by sourcing the ''setenv.sh'' file in the sbatch script. On the login node only the modules JAVA and spark have to be loaded and the variable ''SPARK_CONF_DIR'' has to be set to the directory into which the ''spark-defauls.conf'' file has been stored. The following screen shot shows the execution of a Spark application started from the login node. gwdu102 > module load JAVA spark gwdu102 > SPARK_CONF_DIR=~/SparkConf gwdu102 > spark-submit ~/SparkApp/target/scala-2.11/sparkapp_2.11-0.1.0-SNAPSHOT.jar 19/08/11 12:41:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties spark.app.id=app-20190811124153-0000 spark.app.name=SparkApp spark.default.parallelism=8 spark.driver.host=gwdu102.global.gwdg.cluster spark.driver.port=43083 spark.executor.cores=2 spark.executor.id=driver spark.executor.memory=4096M spark.jars=file:/usr/users/ohaan/SparkApp/target/scala-2.11/sparkapp_2.11-0.1.0-SNAPSHOT.jar spark.master=spark://gwdd034:7077 spark.submit.deployMode=client Information from partitions: host,executor_id,start_time (gwdd034,0,12:41:57) (gwdd034,0,12:41:57) (gwdd047,2,12:41:57) (gwdd047,2,12:41:57) (gwdd034,1,12:41:57) (gwdd034,1,12:41:57) (gwdd046,3,12:41:57) (gwdd046,3,12:41:57) total time for application: 13.789137319 For executig the Spark application as a batch job, the batch sript ''start-standalone.sh'' has to be modified by replacing the command ''sleep infinity'' by spark-submit ~/SparkApp/target/scala-2.11/sparkapp_2.11-0.1.0-SNAPSHOT.jar $SPARK_HOME/sbin/stop-all.sh [[Kategorie: Scientific Computing]]