wiki:hpc:parallel_processing_with_spark_on_gwdg_s_scientific_compute_cluster
no way to compare when less than two revisions
Differences
This shows you the differences between two versions of the page.
— | wiki:hpc:parallel_processing_with_spark_on_gwdg_s_scientific_compute_cluster [2019/08/11 15:34] (current) – created - external edit 127.0.0.1 | ||
---|---|---|---|
Line 1: | Line 1: | ||
+ | ======= Parallel Processing with Spark on GWDG’s Scientific Compute Cluster ======= | ||
+ | |||
+ | ===== Apache Spark ===== | ||
+ | Apache Spark is a cluster computing framework for processing large data sets, based on [[https:// | ||
+ | |||
+ | The following sections will describe the Spark environment on GWDG’s compute cluster and the use of Spark’s RDD’s for parallel processing. The Scala programming language will be explained only so far as needed for the examples. Programming guides covering all of Scala are e.g. the online books [[ https:// | ||
+ | |||
+ | ===== The Interactive Spark Environment===== | ||
+ | |||
+ | GWDG provides version 2.3.1 of the Apache Spark framework. On all frontend nodes for the cluster (gwdu101, gwdu102, gwdu103) the Spark environment is loaded by the module command: | ||
+ | < | ||
+ | module load JAVA/ | ||
+ | </ | ||
+ | (Because spark depends on JAVA 8, the module '' | ||
+ | |||
+ | Spark provides an interactive Spark Shell for executing single Spark commands or scripts with collections of Spark commands. This shell can be loaded locally on a frontend node by executing the command | ||
+ | < | ||
+ | spark-shell | ||
+ | </ | ||
+ | which produces after a number of comment lines the prompt | ||
+ | < | ||
+ | scala> | ||
+ | </ | ||
+ | Spark expressions entered in this shell will be evaluated by using as many cores of the node in parallel as needed and the result will be returned on the screen. | ||
+ | |||
+ | |||
+ | ===== Introducing the Spark Shell ===== | ||
+ | The interactive Spark Shell is an extension of the [[https:// | ||
+ | The available commands in this shell are listed by executing '': | ||
+ | < | ||
+ | scala> :help | ||
+ | All commands can be abbreviated, | ||
+ | :edit < | ||
+ | :help [command] | ||
+ | :history [num] show the history (optional num is commands to show) | ||
+ | :h? < | ||
+ | :imports [name name ...] show import history, identifying sources of names | ||
+ | :implicits [-v] show the implicits in scope | ||
+ | :javap < | ||
+ | :line < | ||
+ | :load < | ||
+ | :paste [-raw] [path] | ||
+ | : | ||
+ | :quit exit the interpreter | ||
+ | :replay [options] | ||
+ | :require < | ||
+ | :reset [options] | ||
+ | :save < | ||
+ | :sh <command line> | ||
+ | :settings < | ||
+ | : | ||
+ | :type [-v] < | ||
+ | :kind [-v] < | ||
+ | : | ||
+ | </ | ||
+ | |||
+ | To close the Spark shell, you press '' | ||
+ | |||
+ | Apache Spark is based on the object oriented functional programming language **Scala**. Basic elements of the language are '' | ||
+ | < | ||
+ | class < | ||
+ | </ | ||
+ | The following simple example shows how to define a class in the Spark Shell. The second line is the message from Spark Shell indicating the succesfull definition of a class: | ||
+ | < | ||
+ | scala> class User (n: String = “none”) { val name = n; def greet = s" | ||
+ | defined class User | ||
+ | </ | ||
+ | The class '' | ||
+ | |||
+ | Instances of a class can be generated by expressions using the keyword '' | ||
+ | < | ||
+ | scala> val userdefault = new User | ||
+ | userdefault: | ||
+ | scala> userdefault.name | ||
+ | res0: String = none | ||
+ | scala> userdefault.greet | ||
+ | res1: String = Hello from none | ||
+ | |||
+ | scala> val userhaan = new User(" | ||
+ | userhaan: User = User@2ded3code1cc | ||
+ | scala> userhaan.name | ||
+ | res2: String = haan | ||
+ | scala> userhaan.greet | ||
+ | res3: String = Hello from haan | ||
+ | </ | ||
+ | |||
+ | Every value defined by executing an expression in the Spark Shell is stored and available for later use, as are the results res[x] returned from executing some expression. The shell can be cleared by executing the command '': | ||
+ | |||
+ | The Spark Shell can execute commands from a script file using the command '': | ||
+ | < | ||
+ | class User( n: String = " | ||
+ | val name = n | ||
+ | def greet = s" | ||
+ | } | ||
+ | val userdefault = new User | ||
+ | userdefault.name | ||
+ | userdefault.greet | ||
+ | val userhaan = new User(" | ||
+ | userhaan.name | ||
+ | userhaan.greet | ||
+ | </ | ||
+ | Loading this script in the Spark Shell gives the following result: | ||
+ | < | ||
+ | scala> :load User.script | ||
+ | Loading User.script... | ||
+ | defined class User | ||
+ | userdefault: | ||
+ | res0: String = none | ||
+ | res1: String = Hello from none | ||
+ | userhaan: User = User@41cfcbb5 | ||
+ | res2: String = haan | ||
+ | res3: String = Hello from haan | ||
+ | </ | ||
+ | |||
+ | ===== Executing System Commands from the Spark Shell ===== | ||
+ | The Spark Shell command '': | ||
+ | < | ||
+ | scala> :sh ls -l | ||
+ | res0: scala.tools.nsc.interpreter.ProcessResult = `ls -l` (29 lines, exit 0) | ||
+ | |||
+ | scala> res0.lines foreach println | ||
+ | total 2709248 | ||
+ | -rw------- 1 ohaan GGST 727 Jan 28 12:07 derby.log | ||
+ | drwx------ 2 ohaan GGST 0 Aug 31 17:06 files | ||
+ | ... | ||
+ | |||
+ | scala> | ||
+ | </ | ||
+ | |||
+ | Unfortunately, | ||
+ | |||
+ | An alternative posibility for executing system commands is the use of Scala' | ||
+ | < | ||
+ | scala> import sys.process._ | ||
+ | import sys.process._ | ||
+ | |||
+ | scala> val content = "ls -l".! | ||
+ | total 864 | ||
+ | -rw------- 1 ohaan GGST 727 Jan 28 12:07 derby.log | ||
+ | drwx------ 2 ohaan GGST 0 Aug 31 17:06 files | ||
+ | ... | ||
+ | |||
+ | content: Int = 0 | ||
+ | |||
+ | scala> println(content) | ||
+ | 0 | ||
+ | |||
+ | scala> val content_string = "ls -l".!! | ||
+ | content_string: | ||
+ | "total 864 | ||
+ | -rw------- 1 ohaan GGST 727 Jan 28 12:07 derby.log | ||
+ | drwx------ 2 ohaan GGST 0 Aug 31 17:06 files | ||
+ | ... | ||
+ | |||
+ | scala> println(content_string) | ||
+ | total 864 | ||
+ | -rw------- 1 ohaan GGST 727 Jan 28 12:07 derby.log | ||
+ | drwx------ 2 ohaan GGST 0 Aug 31 17:06 files | ||
+ | ... | ||
+ | |||
+ | </ | ||
+ | |||
+ | For system shell commands using wildcards, the input string has to be evaluated by a system shell, which can be realized by applying the '' | ||
+ | < | ||
+ | scala> Seq("/ | ||
+ | map1.script | ||
+ | map.script | ||
+ | nums.script | ||
+ | pi.script | ||
+ | st.script | ||
+ | User.script | ||
+ | while.script | ||
+ | res68: Int = 0 | ||
+ | </ | ||
+ | ===== Creating Spark RDDs ===== | ||
+ | The main entry point for Spark functionality is the class '' | ||
+ | |||
+ | Invoking the Spark Shell automatically generates the instance '' | ||
+ | |||
+ | |||
+ | There are three ways to create RDDs: | ||
+ | * copying existing Scala collections | ||
+ | * reading data from external storage | ||
+ | * creating new RDDs from existing RDDs | ||
+ | |||
+ | ==== Copying Collections ==== | ||
+ | An existing Scala collection is transformed into an RDD with the method '' | ||
+ | < | ||
+ | scala> val nums = 1 to 10 | ||
+ | nums: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) | ||
+ | |||
+ | scala> val nums_RDD = sc.parallelize(nums, | ||
+ | nums_RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at < | ||
+ | </ | ||
+ | |||
+ | The first command creates the Scala collection '' | ||
+ | |||
+ | Since the expression '' | ||
+ | < | ||
+ | scala> val nums_RDD = sc.parallelize(1 to 10,3) | ||
+ | nums_RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at < | ||
+ | </ | ||
+ | |||
+ | ==== Reading Data ==== | ||
+ | SparkSession provides methods to read data in several formats from storage into RDDs. The method '' | ||
+ | < | ||
+ | line1 : Hello | ||
+ | line2 : World | ||
+ | line3 : to | ||
+ | line4 : everyone | ||
+ | </ | ||
+ | in the directory, from where the Spark Shell was started, will be loaded as follows: | ||
+ | < | ||
+ | scala> val lines_RDD = sc.textFile(" | ||
+ | lines_RDD: org.apache.spark.rdd.RDD[String] = lines.txt MapPartitionsRDD[18] at textFile at < | ||
+ | </ | ||
+ | |||
+ | The argument in the method '' | ||
+ | |||
+ | Another method to create a RDD by reading text files is '' | ||
+ | ==== Modifying Existing RDDs ==== | ||
+ | The RDD class allows the application of various methods, which result in the creation of new instances of RDD. Some of them will be described in more detail later on. An example is the method '' | ||
+ | < | ||
+ | scala> val linelength = lines_RDD.map(s => s.length) | ||
+ | linelength: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at < | ||
+ | </ | ||
+ | |||
+ | Whereas '' | ||
+ | |||
+ | ===== Working with RDDs ===== | ||
+ | Once a RDD is created, the methods defined for this class can be used to do work on its content in order to produce the required results. RDDs support two types of operations: transformations, | ||
+ | |||
+ | All transformations in Spark are //lazy//, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently. | ||
+ | |||
+ | Transformations are called //narrow// if they don't mix data from different partitions of the input RDD, otherwise they are called //wide//. | ||
+ | |||
+ | Because the partitions of a RDD can be distributed over a cluster of computing resources, wide transformations and actions may involve data communication over the clusters network, whereas narrow transformations can be executed locally. | ||
+ | | ||
+ | | ||
+ | |||
+ | A list of all methods available for RDDs is displayed in the Spark Shell by completing the identifyer for the RDD followed by a full-stop with the tab-key: | ||
+ | < | ||
+ | scala> nums_RDD. [tab-key] | ||
+ | ++ countByValue | ||
+ | aggregate | ||
+ | cache | ||
+ | canEqual | ||
+ | cartesian | ||
+ | checkpoint | ||
+ | coalesce | ||
+ | collect | ||
+ | collectAsync | ||
+ | compute | ||
+ | context | ||
+ | copy foreachPartitionAsync | ||
+ | count | ||
+ | countApprox | ||
+ | countApproxDistinct | ||
+ | countAsync | ||
+ | </ | ||
+ | |||
+ | A description for the methods can be found in the [[https:// | ||
+ | |||
+ | |||
+ | |||
+ | ==== Displaying Properties of the RDD ==== | ||
+ | |||
+ | Methods for displaying the size and content of the RDDs : | ||
+ | |||
+ | ^ method | ||
+ | |count() | ||
+ | |first() | ||
+ | |take(num) | ||
+ | |takeSample(withReplacement, | ||
+ | |collect() | ||
+ | |||
+ | |||
+ | Applying the method '' | ||
+ | < | ||
+ | scala> val numscount = nums_RDD.count() | ||
+ | numscount: Long = 10 | ||
+ | |||
+ | scala> val linescount = lines_RDD.count() | ||
+ | linescount: Long = 4 | ||
+ | </ | ||
+ | |||
+ | The '' | ||
+ | < | ||
+ | scala> val numscount = nums_RDD.size() | ||
+ | < | ||
+ | val numscount = nums_RDD.size() | ||
+ | ^ | ||
+ | </ | ||
+ | |||
+ | Selections of the content of a RDD can be accessed by the '' | ||
+ | < | ||
+ | scala> val content = lines_RDD.first() | ||
+ | content: String = line1 : Hello | ||
+ | |||
+ | scala> val content = nums_RDD.take(5) | ||
+ | content: Array[Int] = Array(1, 2, 3, 4, 5) | ||
+ | </ | ||
+ | |||
+ | The '' | ||
+ | |||
+ | < | ||
+ | scala> nums_RDD.takeSample(false, | ||
+ | res30: Array[Int] = Array(5, 10, 6, 8) | ||
+ | |||
+ | scala> nums_RDD.takeSample(false, | ||
+ | res31: Array[Int] = Array(8, 3, 2, 6) | ||
+ | |||
+ | scala> nums_RDD.takeSample(true, | ||
+ | res32: Array[Int] = Array(4, 6, 2, 8, 5, 5, 3, 3, 1, 1, 8, 9, 1, 6) | ||
+ | |||
+ | scala> nums_RDD.takeSample(true, | ||
+ | res33: Array[Int] = Array(9, 8, 3, 9, 5, 1, 10, 9, 10, 7, 2, 1, 3, 5) | ||
+ | </ | ||
+ | |||
+ | The '' | ||
+ | |||
+ | < | ||
+ | scala> val content = lines_RDD.collect() | ||
+ | content: Array[String] = Array(line1 : Hello, line2 : World, line3 : to, line4 : everyone) | ||
+ | |||
+ | scala> val content = nums_RDD.collect() | ||
+ | content: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) | ||
+ | |||
+ | scala> val content = nums_RDD.take(nums_RDD.count.toInt) | ||
+ | content: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) | ||
+ | </ | ||
+ | |||
+ | The '' | ||
+ | ==== Displaying Properties of RDD Partitions ==== | ||
+ | Methods for displaying the number, sizes and contents of the partitions of the RDD are: | ||
+ | |||
+ | ^ method | ||
+ | |getNumPartitions |returns the number of partitions | ||
+ | |mapPartitions(f) |returns a new RDD by applying a function f: Iterator ⇒ Iterator to each partition| | ||
+ | |glom()| returns a new RDD created by coalescing all elements within each partition into an array | | ||
+ | |||
+ | An example for getting the number of partitions is | ||
+ | < | ||
+ | scala> val nums_RDD = sc.parallelize(1 to 50) | ||
+ | nums_RDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[28] at parallelize at < | ||
+ | scala> nums_RDD.collect | ||
+ | res5: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50) | ||
+ | scala> nums_RDD.getNumPartitions | ||
+ | res0: Int = 24 | ||
+ | </ | ||
+ | which shows that the method '' | ||
+ | |||
+ | The next example shows the result if the second parameter is set: | ||
+ | < | ||
+ | scala> sc.parallelize(1 to 50, | ||
+ | res22: Int = 3 | ||
+ | </ | ||
+ | |||
+ | The sizes of the individual partitions can be obtained using the '' | ||
+ | < | ||
+ | scala> val part = nums_RDD.mapPartitions((x: | ||
+ | part: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[29] at mapPartitions at < | ||
+ | |||
+ | scala> part.collect | ||
+ | res6: Array[Int] = Array(2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3) | ||
+ | |||
+ | scala> val partpart = part.mapPartitions(x => Iterator(x.size)) | ||
+ | partpart: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[30] at mapPartitions at < | ||
+ | |||
+ | scala> partpart.collect | ||
+ | res7: Array[Int] = Array(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1) | ||
+ | |||
+ | </ | ||
+ | |||
+ | The type assignment for the input iterator is redundant, because its type can be inferred from the type of elements in the RRD. This has been used in the expression for the value '' | ||
+ | |||
+ | The '' | ||
+ | < | ||
+ | scala> nums_RDD.glom() | ||
+ | res2: org.apache.spark.rdd.RDD[Array[Int]] = MapPartitionsRDD[52] at glom at < | ||
+ | |||
+ | scala> nums_RDD.glom().getNumPartitions | ||
+ | res3: Int = 24 | ||
+ | |||
+ | scala> nums_RDD.glom().collect() | ||
+ | res4: Array[Array[Int]] = Array(Array(1, | ||
+ | |||
+ | scala> sc.parallelize(1 to 50, | ||
+ | res27: Int = 3 | ||
+ | |||
+ | scala> sc.parallelize(1 to 50, | ||
+ | res28: Array[Array[Int]] = Array(Array(1, | ||
+ | </ | ||
+ | ==== Saving RDDs to Disk ==== | ||
+ | |||
+ | ^ | ||
+ | |saveAsTextFile(path)| save the RDD as a text file, using string representations of elements.| | ||
+ | |||
+ | This methods accepts a pathname to a directory, into which it saves the RDDs partitions in separate files, named '' | ||
+ | |||
+ | As an example, the directory '' | ||
+ | |||
+ | < | ||
+ | gwdu103:35 17:55:00 ~/ | ||
+ | file1 file2 file3 | ||
+ | </ | ||
+ | |||
+ | Using '' | ||
+ | |||
+ | < | ||
+ | val textfile_RDD = sc.textFile(" | ||
+ | textfile_RDD: | ||
+ | |||
+ | scala> textfile_RDD.count | ||
+ | res67: Long = 12 | ||
+ | |||
+ | scala> textfile_RDD.getNumPartitions | ||
+ | res68: Int = 3 | ||
+ | |||
+ | scala> textfile_RDD.glom().collect | ||
+ | res69: Array[Array[String]] = Array(Array(b1, | ||
+ | </ | ||
+ | |||
+ | '' | ||
+ | |||
+ | < | ||
+ | gwdu103:35 18:07:29 ~/ | ||
+ | part-00000 | ||
+ | gwdu103:35 18:08:28 ~/ | ||
+ | |||
+ | *** newdir: directory *** | ||
+ | |||
+ | gwdu103:35 18:08:57 ~/ | ||
+ | :::::::::::::: | ||
+ | newdir/ | ||
+ | :::::::::::::: | ||
+ | b1 | ||
+ | b2 | ||
+ | b3 | ||
+ | b4 | ||
+ | b5 | ||
+ | :::::::::::::: | ||
+ | newdir/ | ||
+ | :::::::::::::: | ||
+ | a1 | ||
+ | a2 | ||
+ | a3 | ||
+ | a4 | ||
+ | :::::::::::::: | ||
+ | newdir/ | ||
+ | :::::::::::::: | ||
+ | c1 | ||
+ | c2 | ||
+ | c3 | ||
+ | :::::::::::::: | ||
+ | newdir/ | ||
+ | :::::::::::::: | ||
+ | </ | ||
+ | |||
+ | Using instead the method '' | ||
+ | |||
+ | < | ||
+ | scala> val wholetextfiles_RDD = sc.wholeTextFiles(" | ||
+ | wholetextfiles_RDD: | ||
+ | |||
+ | scala> wholetextfiles_RDD.count | ||
+ | res77: Long = 3 | ||
+ | |||
+ | scala> wholetextfiles_RDD.getNumPartitions | ||
+ | res78: Int = 2 | ||
+ | |||
+ | scala> wholetextfiles_RDD.glom().collect | ||
+ | res79: Array[Array[(String, | ||
+ | Array(Array((file:/ | ||
+ | b2 | ||
+ | b3 | ||
+ | b4 | ||
+ | b5 | ||
+ | "), (file:/ | ||
+ | a2 | ||
+ | a3 | ||
+ | a4 | ||
+ | ")), Array((file:/ | ||
+ | c2 | ||
+ | c3 | ||
+ | "))) | ||
+ | </ | ||
+ | |||
+ | Saving '' | ||
+ | < | ||
+ | gwdu103:35 18:09:11 ~/ | ||
+ | part-00000 | ||
+ | gwdu103:35 18:36:56 ~/ | ||
+ | :::::::::::::: | ||
+ | newdir1/ | ||
+ | :::::::::::::: | ||
+ | (file:/ | ||
+ | b2 | ||
+ | b3 | ||
+ | b4 | ||
+ | b5 | ||
+ | ) | ||
+ | (file:/ | ||
+ | a2 | ||
+ | a3 | ||
+ | a4 | ||
+ | ) | ||
+ | :::::::::::::: | ||
+ | newdir1/ | ||
+ | :::::::::::::: | ||
+ | (file:/ | ||
+ | c2 | ||
+ | c3 | ||
+ | ) | ||
+ | :::::::::::::: | ||
+ | newdir1/ | ||
+ | :::::::::::::: | ||
+ | </ | ||
+ | |||
+ | ==== More Transformations for RDDs ==== | ||
+ | |||
+ | ^ | ||
+ | |map(f) |returns a new RDD by applying a function f to each element| | ||
+ | |flatMap(f) |returns a new RDD by first applying a function f to all elements, and then flattening the results| | ||
+ | |mapPartitions(f) |returns a new RDD by applying a function f: Iterator ⇒ Iterator to each partition| | ||
+ | |mapPartitionsWithIndex(f)|returns a new RDD by applying a function (Int, Iterator) ⇒ Iterator to each partition, while tracking the index of the original partition| | ||
+ | | filter(f)|returns a new RDD containing only the elements that satisfy a predicate f: (T) => Boolean| | ||
+ | |||
+ | === map === | ||
+ | The '' | ||
+ | |||
+ | < | ||
+ | scala> val ints = sc.parallelize(1 to 10,3) | ||
+ | ints: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[26] at parallelize at < | ||
+ | |||
+ | scala> def double(x: Int): Int = 2*x | ||
+ | double: (x: Int)Int | ||
+ | |||
+ | scala> val ints_2 = ints.map(double) | ||
+ | ints_2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[27] at map at < | ||
+ | |||
+ | scala> ints.collect | ||
+ | res32: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) | ||
+ | |||
+ | scala> ints_2.collect | ||
+ | res33: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20) | ||
+ | |||
+ | scala> ints.getNumPartitions | ||
+ | res34: Int = 3 | ||
+ | |||
+ | scala> ints_2.getNumPartitions | ||
+ | res35: Int = 3 | ||
+ | </ | ||
+ | |||
+ | The output in the Spark Shell displays the expected result of applying the '' | ||
+ | |||
+ | The argument in the map method can be an identifier of a previously defined function as in the previous example. Also a function literal (anonymous function) can be used: | ||
+ | < | ||
+ | scala> val ints_2=ints.map((x: | ||
+ | ints_2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at < | ||
+ | |||
+ | scala> ints_2.collect | ||
+ | res1: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20) | ||
+ | </ | ||
+ | |||
+ | The placeholder syntax of scala can be used for specifying the function literal: | ||
+ | < | ||
+ | scala> val ints_2=ints.map(_*2) | ||
+ | </ | ||
+ | |||
+ | === flatMap === | ||
+ | |||
+ | If the function, that transforms the elements of the RDD, returns a collection of several elements, the result of '' | ||
+ | < | ||
+ | scala> ints.map(x => List(2*x-1, | ||
+ | res45: Array[List[Int]] = Array(List(1, | ||
+ | </ | ||
+ | |||
+ | The '' | ||
+ | |||
+ | < | ||
+ | scala> ints.flatMap(x => List(2*x-1, | ||
+ | res46: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20) | ||
+ | </ | ||
+ | |||
+ | Applying the '' | ||
+ | < | ||
+ | scala> ints.flatMap(x => List(2*x-1, | ||
+ | res47: Array[Array[Int]] = Array(Array(1, | ||
+ | |||
+ | </ | ||
+ | |||
+ | === mapPartitions, | ||
+ | |||
+ | The '' | ||
+ | |||
+ | < | ||
+ | scala> def ff(it: Iterator[Int]): | ||
+ | ff: (it: Iterator[Int])Iterator[Int] | ||
+ | |||
+ | scala> val ints = sc.parallelize(1 to 10,3) | ||
+ | ints: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[63] at parallelize at < | ||
+ | |||
+ | scala> ints.glom.collect | ||
+ | res127: Array[Array[Int]] = Array(Array(1, | ||
+ | |||
+ | scala> ints.mapPartitions(ff).collect | ||
+ | res126: Array[Int] = Array(3, 3, 4) | ||
+ | </ | ||
+ | |||
+ | The same result can be acchieved by giving the funtion argument for the mapPartitions method as a function literal | ||
+ | < | ||
+ | scala> ints.mapPartitions(it => List(it.size).iterator).collect | ||
+ | res129: Array[Int] = Array(3, 3, 4) | ||
+ | </ | ||
+ | |||
+ | With the method '' | ||
+ | In the following example, to every element of a partition the value 100*(index+1) is added: | ||
+ | < | ||
+ | scala> val ints = sc.parallelize(1 to 10,3) | ||
+ | ints: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at < | ||
+ | |||
+ | scala> ints.collect | ||
+ | res5: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) | ||
+ | |||
+ | scala> def fun(index: Int, it: Iterator[Int]): | ||
+ | fun: (index: Int, it: Iterator[Int])Iterator[Int] | ||
+ | |||
+ | scala> ints.mapPartitionsWithIndex(fun).collect | ||
+ | res6: Array[Int] = Array(101, 102, 103, 204, 205, 206, 307, 308, 309, 310) | ||
+ | |||
+ | scala> ints.mapPartitionsWithIndex((index, | ||
+ | res7: Array[Int] = Array(101, 102, 103, 204, 205, 206, 307, 308, 309, 310) | ||
+ | </ | ||
+ | |||
+ | In the example the function defining the transformatioon is first provided by its identifier and then as a function literal. | ||
+ | |||
+ | === filter === | ||
+ | |||
+ | With the '' | ||
+ | < | ||
+ | scala> val ints = sc.parallelize(1 to 10,3) | ||
+ | ints: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at < | ||
+ | |||
+ | scala> ints.glom.collect | ||
+ | res16: Array[Array[Int]] = Array(Array(1, | ||
+ | |||
+ | scala> def fun(a: Int): Boolean = (a%2 == 0) | ||
+ | fun: (a: Int)Boolean | ||
+ | |||
+ | scala> ints.filter(fun).glom.collect | ||
+ | res17: Array[Array[Int]] = Array(Array(2), | ||
+ | |||
+ | scala> ints.filter(a => (a < 5)).glom.collect | ||
+ | res18: Array[Array[Int]] = Array(Array(1, | ||
+ | </ | ||
+ | |||
+ | ==== Some Actions on RDDs ==== | ||
+ | |||
+ | ^ method | ||
+ | |reduce(func) | Aggregate the elements of the dataset using a function func (which takes two arguments and returns one).\\ The function should be commutative and associative so that it can be computed correctly in parallel.| | ||
+ | |collect() | Return all the elements of the dataset as an array at the driver program. | | ||
+ | |count() | Return the number of elements in the dataset.| | ||
+ | |first() | Return the first element of the dataset (similar to take(1)).| | ||
+ | |take(n) | Return an array with the first n elements of the dataset.| | ||
+ | |||
+ | === reduce === | ||
+ | |||
+ | The '' | ||
+ | |||
+ | < | ||
+ | scala> val rdd1 = sc.parallelize(1 to 10,2) | ||
+ | rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[61] at parallelize at < | ||
+ | |||
+ | scala> rdd1.glom.collect | ||
+ | res146: Array[Array[Int]] = Array(Array(1, | ||
+ | |||
+ | scala> rdd1.reduce((x, | ||
+ | res147: Int = 55 | ||
+ | |||
+ | scala> rdd1.reduce((x, | ||
+ | res148: Int = 10 | ||
+ | |||
+ | scala> rdd1.reduce((x, | ||
+ | res149: Int = 10 | ||
+ | </ | ||
+ | |||
+ | The last two commands express the same operation: The first shows, that '' | ||
+ | |||
+ | The next example shows the action of the '' | ||
+ | |||
+ | < | ||
+ | scala> val chars = (' | ||
+ | chars: List[String] = List(a, b, c, d, e, f, g, h, i, j) | ||
+ | |||
+ | scala> val rdd2 = sc.parallelize(chars, | ||
+ | rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[65] at parallelize at < | ||
+ | |||
+ | scala> rdd2.glom.collect | ||
+ | res151: Array[Array[String]] = Array(Array(a, | ||
+ | |||
+ | scala> rdd2.reduce((x, | ||
+ | res152: String = abcdefghij | ||
+ | </ | ||
+ | In this example the collection of '' | ||
+ | |||
+ | The mechanisme working behind the reduce method can be shown by the following recursive Scala function reducing the elements of a List-collection: | ||
+ | |||
+ | < | ||
+ | scala> | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | | ||
+ | red: [A](li: List[A], op: (A, A) => A)A | ||
+ | </ | ||
+ | |||
+ | The function '' | ||
+ | |||
+ | This function now is used to reduce the List-collection of strings '' | ||
+ | < | ||
+ | scala> red[String](chars, | ||
+ | res2: String = abcdefghij | ||
+ | </ | ||
+ | and produces the expected result. | ||
+ | |||
+ | It will be interesting to follow the separate steps in the reduction process by including a print statement in the function literal: | ||
+ | < | ||
+ | scala> red[String](chars, | ||
+ | (a,b) | ||
+ | (ab,c) | ||
+ | (abc,d) | ||
+ | (abcd,e) | ||
+ | (abcde,f) | ||
+ | (abcdef,g) | ||
+ | (abcdefg,h) | ||
+ | (abcdefgh, | ||
+ | (abcdefghi, | ||
+ | res4: String = abcdefghij | ||
+ | </ | ||
+ | As expected, in each reduction step the next character is appended to the partial string result obtained so far. | ||
+ | |||
+ | Now the reduction steps will be examined for RDD-collections, | ||
+ | < | ||
+ | scala> val rdd_1p = sc.parallelize(chars, | ||
+ | rdd_1p: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at < | ||
+ | |||
+ | scala> red[String](chars, | ||
+ | (a,b) | ||
+ | (ab,c) | ||
+ | (abc,d) | ||
+ | (abcd,e) | ||
+ | (abcde,f) | ||
+ | (abcdef,g) | ||
+ | (abcdefg,h) | ||
+ | (abcdefgh, | ||
+ | (abcdefghi, | ||
+ | res8: String = abcdefghij | ||
+ | </ | ||
+ | |||
+ | With 2 partitions, a different pattern for the reduction is observed: | ||
+ | < | ||
+ | scala> val rdd_2p = sc.parallelize(chars, | ||
+ | rdd_2p: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at < | ||
+ | |||
+ | scala> rdd_2p.glom.collect | ||
+ | res9: Array[Array[String]] = Array(Array(a, | ||
+ | |||
+ | scala> rdd_2p.reduce((x, | ||
+ | (a,b) | ||
+ | (f,g) | ||
+ | (ab,c) | ||
+ | (fg,h) | ||
+ | (fgh,i) | ||
+ | (fghi,j) | ||
+ | (abc,d) | ||
+ | (abcd,e) | ||
+ | (fghij, | ||
+ | res10: String = fghijabcde | ||
+ | </ | ||
+ | The succession of reduction steps shows, that the characters in each partitions are concatenated separately, and only at the end the two partial strings from the two partitions are combined to the final complete string. The partial reduction in the partitions is done simultaneously, | ||
+ | |||
+ | The parallel processing of the partial reductions in the different partitions can again be observed with three partitions: | ||
+ | < | ||
+ | scala> val rdd_3p = sc.parallelize(chars, | ||
+ | rdd_3p: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at parallelize at < | ||
+ | |||
+ | scala> rdd_3p.glom.collect | ||
+ | res4: Array[Array[String]] = Array(Array(a, | ||
+ | |||
+ | scala> rdd_3p.reduce((x, | ||
+ | (a,b) | ||
+ | (d,e) | ||
+ | (g,h) | ||
+ | (ab,c) | ||
+ | (gh,i) | ||
+ | (de,f) | ||
+ | (ghi,j) | ||
+ | (abc,ghij) | ||
+ | (abcghij, | ||
+ | res6: String = abcghijdef | ||
+ | </ | ||
+ | |||
+ | ===== Building and Submitting Spark Applications ===== | ||
+ | |||
+ | The Spark Shell is appropriate for experimenting with Spark' | ||
+ | The components involved in running a spark application are displayed schematically in the following picture: | ||
+ | | ||
+ | |||
+ | The '' | ||
+ | |||
+ | A '' | ||
+ | |||
+ | The execution of the statements in the Spark application program then will be shared between driver and executors. Statements involving RDDs will be executed on the executors. An operation for a RDD is split into a number of tasks acccording to the number of partitions in the RDD, each task responsible for executing the operation on the data set in the respective partition. The driver process includes a '' | ||
+ | |||
+ | In addition to distribute the work to be done on the partitions of RDDs to the excutors, the driver collects and combines the results the executors produce for action operations on RDD partitions and processes all statements in the program, which do not involve RDDs. | ||
+ | |||
+ | The next sections will describe: | ||
+ | - defining the '' | ||
+ | - producing an executable jar-file from this program using the sbt (simple built tool) | ||
+ | - running the application with '' | ||
+ | - running the application on a Spark standalone cluster | ||
+ | - submitting Spark application job to the scheduling system SLURM for execution on GWDG's scientific compute cluster. | ||
+ | ==== Setting the Spark Context ==== | ||
+ | |||
+ | Establishing | ||
+ | |||
+ | < | ||
+ | import org.apache.spark.SparkContext | ||
+ | |||
+ | object SparkConnect { | ||
+ | def main(args: Array[String] = Array("" | ||
+ | val sp = new SparkContext() | ||
+ | println(sp.getConf.toDebugString) | ||
+ | sp.stop() | ||
+ | } | ||
+ | } | ||
+ | |||
+ | </ | ||
+ | In the first line of the program the '' | ||
+ | |||
+ | Then the object '' | ||
+ | |||
+ | The printstatement gives the details of the context' | ||
+ | |||
+ | The last statement deactivates the connection to the created SparkContext instance. A Spark application can be connected to only one single SparkContext. Trying to create a second connection will result in an error and terminate the application. Therefore the stop method has to be applied to an existing SparkContext instance, before a new one can be established. | ||
+ | |||
+ | If it is not known, wether a connection to a SparkContext allready exists, the new connection should be established with the '' | ||
+ | |||
+ | < | ||
+ | val sp = SparkContext.getOrCreate() | ||
+ | </ | ||
+ | If an instance of '' | ||
+ | |||
+ | |||
+ | |||
+ | From the file containing this program code an executabel '' | ||
+ | |||
+ | The program, stored in a file '' | ||
+ | |||
+ | < | ||
+ | scala> :load SparkConnect.scala | ||
+ | Loading SparkConnect.scala... | ||
+ | import org.apache.spark.SparkContext | ||
+ | defined object SparkConnect | ||
+ | |||
+ | scala> SparkConnect.main() | ||
+ | org.apache.spark.SparkException: | ||
+ | ... | ||
+ | |||
+ | scala> sc.stop() | ||
+ | |||
+ | scala> SparkConnect.main() | ||
+ | spark.app.id=local-1550248586719 | ||
+ | spark.app.name=Spark shell | ||
+ | spark.driver.host=gwdu102.global.gwdg.cluster | ||
+ | spark.driver.port=36803 | ||
+ | spark.executor.id=driver | ||
+ | spark.jars= | ||
+ | spark.master=local[*] | ||
+ | spark.submit.deployMode=client | ||
+ | spark.ui.showConsoleProgress=true | ||
+ | </ | ||
+ | |||
+ | As was explained in previous sections, invoking the Spark Shell automatically provides a connection to a SparkContext with the value '' | ||
+ | |||
+ | The properties of the SparkContext instance can be defined locally in the '' | ||
+ | < | ||
+ | import org.apache.spark.{SparkConf, | ||
+ | import org.apache.log4j.{ Logger, Level} | ||
+ | |||
+ | object SparkConnect { | ||
+ | def main(args: Array[String] = Array("" | ||
+ | Logger.getLogger(" | ||
+ | val co = new SparkConf(false).setMaster(" | ||
+ | val sp = new SparkContext(co) | ||
+ | println(sp.getConf.toDebugString) | ||
+ | sp.stop | ||
+ | } | ||
+ | } | ||
+ | </ | ||
+ | The argument '' | ||
+ | |||
+ | There is a large list of properties to be set, see the corresponding documentation for [[https:// | ||
+ | |||
+ | |||
+ | A further addition in this example is the setting of the log level to '' | ||
+ | |||
+ | |||
+ | ==== Building the Spark Application Executable with SBT==== | ||
+ | | ||
+ | There exist many different tools for compiling Scala code and build an executable jar-file. One of them is '' | ||
+ | |||
+ | In order to use the sbt tool on GWDG's cluster, the following module has to be loaded: | ||
+ | < | ||
+ | module load scala/sbt | ||
+ | </ | ||
+ | |||
+ | The current working directory, from which '' | ||
+ | |||
+ | The following file '' | ||
+ | |||
+ | < | ||
+ | /* build.sbt */ | ||
+ | |||
+ | name := " | ||
+ | scalaVersion := " | ||
+ | |||
+ | logLevel := Level.Error | ||
+ | libraryDependencies ++= Seq(" | ||
+ | </ | ||
+ | |||
+ | The build file defines a '' | ||
+ | |||
+ | The executable jar file for a Spark application is built with the '' | ||
+ | |||
+ | sbt looks for source code files with suffix '' | ||
+ | |||
+ | An interactive sbt shell can be invoked in the base directory with the command '' | ||
+ | |||
+ | < | ||
+ | gwdu102:54 11:16:02 ~/SparkApp > sbt | ||
+ | [info] Updated file / | ||
+ | [info] Loading project definition from / | ||
+ | [info] Loading settings from build.sbt ... | ||
+ | [info] Set current project to SparkApp (in build file:/ | ||
+ | [info] sbt server started at local:/// | ||
+ | sbt: | ||
+ | sbt: | ||
+ | [info] * / | ||
+ | [success] Total time: 0 s, completed Feb 20, 2019 11:17:43 AM | ||
+ | |||
+ | </ | ||
+ | |||
+ | |||
+ | Invoking sbt for the first time generates a complex hierarchy of directories, | ||
+ | The sbt-shell prompt '' | ||
+ | With the task '' | ||
+ | |||
+ | The follwing list shows some sbt tasks, wich can be executed in the sbt-shell. | ||
+ | |||
+ | ^ tasks ^ | ||
+ | |tasks | displays all tasks available in the sbt-shell| | ||
+ | |compile | compiles the code from source files, generating class files | | ||
+ | |package| produces binary jar files from the compiled sources ready for execution| | ||
+ | |run | runs the main class from the executable jar file| | ||
+ | |clean | deletes files produced by the sbt tasks, such as compiled classes and binary jar files| | ||
+ | |reload| reloads the project in the base directory (after changes of settings in the .sbt file)| | ||
+ | |exit | ends the sbt-shell| | ||
+ | |||
+ | These tasks can also be executed as batch commands '' | ||
+ | |||
+ | The executable jar files for Spark applications are generated with the '' | ||
+ | |||
+ | < | ||
+ | sbt: | ||
+ | [success] Total time: 4 s, completed Feb 20, 2019 3:43:39 PM | ||
+ | sbt: | ||
+ | [info] / | ||
+ | [success] Total time: 0 s, completed Feb 20, 2019 3:43:43 PM | ||
+ | </ | ||
+ | |||
+ | The second command '' | ||
+ | |||
+ | The executable jar file can be submitted by the '' | ||
+ | |||
+ | < | ||
+ | sbt: | ||
+ | Using Spark' | ||
+ | 19/02/20 16:03:26 WARN NativeCodeLoader: | ||
+ | spark.app.id=local-1550675006775 | ||
+ | spark.app.name=SparkApp | ||
+ | spark.driver.host=gwdu102.global.gwdg.cluster | ||
+ | spark.driver.port=39956 | ||
+ | spark.executor.id=driver | ||
+ | spark.master=local[2] | ||
+ | [success] Total time: 3 s, completed Feb 20, 2019 4:03:27 PM | ||
+ | </ | ||
+ | |||
+ | ==== Running a Spark Application with spark-submit ==== | ||
+ | |||
+ | The '' | ||
+ | |||
+ | < | ||
+ | spark-submit \ | ||
+ | --master < | ||
+ | --conf < | ||
+ | ... # other options | ||
+ | < | ||
+ | </ | ||
+ | |||
+ | Some properties can be set using a reserved option name, for example the '' | ||
+ | |||
+ | ^ Master %%URL%% ^ Meaning ^ | ||
+ | |local | Run Spark locally with one worker thread (i.e. no parallelism at all)| | ||
+ | |local[K] | Run Spark locally with K worker threads | | ||
+ | |local[*] | Run Spark locally with as many worker threads as logical cores on your machine| | ||
+ | |spark: | ||
+ | |||
+ | The complete list of possible options with their default settings can be inspected by calling '' | ||
+ | |||
+ | '' | ||
+ | |||
+ | A Spark Application creating a SparkContext without specifying any properties as in | ||
+ | |||
+ | < | ||
+ | import org.apache.spark.SparkContext | ||
+ | |||
+ | object SparkConnect { | ||
+ | def main(args: Array[String] = Array("" | ||
+ | val sp = new SparkContext() | ||
+ | println(sp.getConf.toDebugString) | ||
+ | sp.stop() | ||
+ | } | ||
+ | } | ||
+ | </ | ||
+ | with a corresponding application-jar file '' | ||
+ | < | ||
+ | | ||
+ | ... | ||
+ | spark.app.id=local-1557919036370 | ||
+ | spark.app.name=SparkConnect | ||
+ | spark.driver.host=gwdu102.global.gwdg.cluster | ||
+ | spark.driver.port=41872 | ||
+ | spark.executor.id=driver | ||
+ | spark.jars=file:/ | ||
+ | spark.master=local[*] | ||
+ | spark.submit.deployMode=client | ||
+ | |||
+ | </ | ||
+ | |||
+ | ==== Ways to Configure the SparkContext ==== | ||
+ | |||
+ | The properties, which can be configured for the SparkContext are listed in the section [[https:// | ||
+ | |||
+ | Here is a short collection of properties from this list | ||
+ | |||
+ | ^ property name^default^description^ | ||
+ | |spark.master |(none) |the cluster manager to connect to | | ||
+ | |spark.cores.max| (not set) |the maximum amount of CPU cores to request for the application from across the cluster | | ||
+ | |spark.executor.cores |all the available cores on the worker in standalone mode |the number of cores to use on each executor | | ||
+ | |spark.default.parallelism| for distributed shuffle operations the largest number of partitions in a parent RDD; for operations like parallelize with no parent RDDs total number of cores on all executor nodes|default number of partitions in RDDs | | ||
+ | |||
+ | |||
+ | There are three different ways to set a configuration property for a SparkContext, | ||
+ | |||
+ | With the method '' | ||
+ | SparkConf.set("< | ||
+ | with the option '' | ||
+ | < | ||
+ | with a line in a the spark configuration file | ||
+ | < | ||
+ | with the name '' | ||
+ | |||
+ | Some frequently used properties can be set with a simplified syntax. Examples are the method '' | ||
+ | |||
+ | |||
+ | ==== Setting up a Spark Standalone Cluster | ||
+ | |||
+ | Starting an application with '' | ||
+ | |||
+ | ^ sript name ^ purpose | ||
+ | |start-master.sh | Starts a master instance on the machine the script is executed on| | ||
+ | |stop-master.sh | ||
+ | |start-slave.sh | ||
+ | |stop-slave.sh | ||
+ | |start-slaves.sh | Starts worker instances on all nodes listed in the file slaves| | ||
+ | |stop-slave.sh | ||
+ | |start-all.sh | Combines starting master and workers| | ||
+ | |stop-all.sh | ||
+ | |||
+ | == Starting the Master | ||
+ | The following lines show the steps for starting a master process for a Spark standalone cluster | ||
+ | < | ||
+ | gwdu102 > module load JAVA/ | ||
+ | gwdu102 > export SPARK_LOG_DIR=~/ | ||
+ | |||
+ | gwdu102 > ${SPARK_HOME}/ | ||
+ | starting org.apache.spark.deploy.master.Master, | ||
+ | |||
+ | gwdu102 > grep Starting / | ||
+ | 2019-07-22 15:45:52 INFO Master:54 - Starting Spark master at spark:// | ||
+ | |||
+ | gwdu102 > jps | ||
+ | 31663 Jps | ||
+ | 29761 Master | ||
+ | </ | ||
+ | |||
+ | Before starting the master by executing | ||
+ | The invocation of '' | ||
+ | |||
+ | Among many other messages printed in this log file is a line with the master-url | ||
+ | '' | ||
+ | The workers in the Spark stanalone cluster will use this master-url for the connection to the master, and the '' | ||
+ | |||
+ | With the command '' | ||
+ | |||
+ | == Starting a Worker | ||
+ | Now a worker process can be started as follows: | ||
+ | < | ||
+ | gwdu102 > export SPARK_WORKER_DIR=~/ | ||
+ | |||
+ | gwdu102 > ${SPARK_HOME}/ | ||
+ | starting org.apache.spark.deploy.worker.Worker, | ||
+ | |||
+ | gwdu102 > grep Starting / | ||
+ | 2019-07-22 15:47:45 INFO Worker:54 - Starting Spark worker 10.108.96.102: | ||
+ | |||
+ | gwdu102 > grep registered / | ||
+ | 2019-07-22 15:47:45 INFO Worker:54 - Successfully registered with master spark:// | ||
+ | |||
+ | gwdu102 > grep Registering / | ||
+ | 2019-07-22 15:47:45 INFO Master:54 - Registering worker 10.108.96.102: | ||
+ | |||
+ | gwdu102 > jps | ||
+ | 18861 Jps | ||
+ | 3081 Worker | ||
+ | 29761 Master | ||
+ | |||
+ | </ | ||
+ | |||
+ | For starting a worker calling '' | ||
+ | |||
+ | The message from starting the worker shows, that a log file is produced in the directory indicated by the environment variable '' | ||
+ | allocated to the worker. A further line announces the registering of this worker with the master. The same event is listed in the master log file as well. | ||
+ | |||
+ | As can be seen from the '' | ||
+ | |||
+ | By default the number of available cores and all available memory of a node is allocated to the worker startet with the '' | ||
+ | |||
+ | == Starting a Worker on a Remote Node == | ||
+ | The standalone cluster can include workers running on different nodes by starting the '' | ||
+ | |||
+ | < | ||
+ | gwdu102 > ssh gwdu103 | ||
+ | Last login: Wed Jun 26 11:36:28 2019 from gwdu102.global.gwdg.cluster | ||
+ | gwdu103 > module load JAVA/ | ||
+ | gwdu103 > export SPARK_LOG_DIR=~/ | ||
+ | |||
+ | gwdu103 > ${SPARK_HOME}/ | ||
+ | starting org.apache.spark.deploy.worker.Worker, | ||
+ | gwdu103 > jps | ||
+ | 20784 Worker | ||
+ | 21588 Jps | ||
+ | |||
+ | gwdu103 > exit | ||
+ | logout | ||
+ | Connection to gwdu103 closed. | ||
+ | |||
+ | gwdu102 > grep Registering / | ||
+ | 2019-07-22 15:47:45 INFO Master:54 - Registering worker 10.108.96.102: | ||
+ | 2019-07-22 16:28:08 INFO Master:54 - Registering worker 10.108.96.103: | ||
+ | |||
+ | </ | ||
+ | Of course after login with '' | ||
+ | |||
+ | All activities for setting up the standalone cluster are documented in the logfile '' | ||
+ | |||
+ | == Starting Multiple Workers with '' | ||
+ | The script '' | ||
+ | |||
+ | The environment created in the local shell from which the script '' | ||
+ | The following screen shots show how to set up the standalone cluster with the '' | ||
+ | |||
+ | Setting environment variables '' | ||
+ | |||
+ | < | ||
+ | gwdu102 > module load JAVA spark | ||
+ | gwdu102 > echo $JAVA_HOME | ||
+ | / | ||
+ | gwdu102 > echo $SPARK_HOME | ||
+ | / | ||
+ | </ | ||
+ | |||
+ | Setting the environment variable '' | ||
+ | |||
+ | < | ||
+ | gwdu102 > export SPARK_CONF_DIR=~/ | ||
+ | gwdu102 > mkdir -p $SPARK_CONF_DIR | ||
+ | </ | ||
+ | |||
+ | Generating a file '' | ||
+ | |||
+ | < | ||
+ | gwdu102 > echo " | ||
+ | gwdu102 > echo " | ||
+ | gwdu102 > echo " | ||
+ | gwdu102 > echo " | ||
+ | |||
+ | gwdu102 > cat $SPARK_CONF_DIR/ | ||
+ | export SPARK_LOG_DIR=~/ | ||
+ | export SPARK_WORKER_DIR=~/ | ||
+ | export SPARK_WORKER_CORES=4 | ||
+ | export SPARK_WORKER_MEMORY=4048M | ||
+ | </ | ||
+ | |||
+ | Generating a file '' | ||
+ | |||
+ | < | ||
+ | gwdu102 > echo gwdu102 > $SPARK_CONF_DIR/ | ||
+ | gwdu102 > echo gwdu101 >> $SPARK_CONF_DIR/ | ||
+ | gwdu102 > echo gwdu103 >> $SPARK_CONF_DIR/ | ||
+ | gwdu102 > cat $SPARK_CONF_DIR/ | ||
+ | gwdu102 | ||
+ | gwdu101 | ||
+ | gwdu103 | ||
+ | </ | ||
+ | |||
+ | Generating the file '' | ||
+ | |||
+ | < | ||
+ | gwdu102 > echo " | ||
+ | gwdu102 > echo " | ||
+ | gwdu102 > echo " | ||
+ | gwdu102 > cat ~/.bashrc | ||
+ | export SPARK_HOME=/ | ||
+ | export JAVA_HOME=/ | ||
+ | export SPARK_CONF_DIR=/ | ||
+ | </ | ||
+ | |||
+ | Now the scripts for starting master and worker can be run: | ||
+ | |||
+ | < | ||
+ | gwdu102 > $SPARK_HOME/ | ||
+ | starting org.apache.spark.deploy.master.Master, | ||
+ | |||
+ | gwdu102 > $SPARK_HOME/ | ||
+ | gwdu103: starting org.apache.spark.deploy.worker.Worker, | ||
+ | gwdu102: starting org.apache.spark.deploy.worker.Worker, | ||
+ | gwdu101: starting org.apache.spark.deploy.worker.Worker, | ||
+ | </ | ||
+ | |||
+ | The last lines of the logfile for the master report the successfull starts of the three workers: | ||
+ | |||
+ | < | ||
+ | gwdu102 > more / | ||
+ | ... | ||
+ | 2019-07-23 15:44:19 INFO Master:54 - Registering worker 10.108.96.102: | ||
+ | 2019-07-23 15:44:21 INFO Master:54 - Registering worker 10.108.96.103: | ||
+ | 2019-07-23 15:44:21 INFO Master:54 - Registering worker 10.108.96.101: | ||
+ | </ | ||
+ | |||
+ | These lines also show, that the workers provide the number of cores and amount of local memory specified in the environment variables '' | ||
+ | |||
+ | The standalone cluster can be started in one step by running | ||
+ | |||
+ | ==== Submitting to the Spark Standalone Cluster ==== | ||
+ | | ||
+ | The Spark standalone cluster provides cores and memory on different worker nodes. The Spark application uses the resources from the standalone cluster in a way specified by configuration properties for the SparkContext. These properties must include the master_url, furthermore number of cores and amount of memory for the executors can be prescribed and the default number of partitions in a RDD collection can be fixed. Different ways to set SparkContext properties programmatically or at execution time have been described in the section '' | ||
+ | |||
+ | < | ||
+ | nodelist=`cat $SPARK_CONF_DIR/ | ||
+ | total_cores=0 | ||
+ | for host in $nodelist | ||
+ | do | ||
+ | cores=$(ssh $host '. $SPARK_CONF_DIR/ | ||
+ | total_cores=$(( $total_cores + $cores )) | ||
+ | done | ||
+ | |||
+ | echo " | ||
+ | echo " | ||
+ | echo " | ||
+ | echo " | ||
+ | echo " | ||
+ | </ | ||
+ | |||
+ | A toy application, | ||
+ | |||
+ | < | ||
+ | /* SparkApp.scala */ | ||
+ | |||
+ | import org.apache.spark._ | ||
+ | import org.apache.spark.{SparkConf, | ||
+ | import org.apache.log4j.{ Logger, Level} | ||
+ | import sys.process._ | ||
+ | |||
+ | object SparkApp { | ||
+ | def work(i: Int): (String, | ||
+ | val str1 = "date +%T" | ||
+ | val time = str1.take(str1.length-1) | ||
+ | val str2 = " | ||
+ | val host = str2.take(str2.length-1) | ||
+ | val id = SparkEnv.get.executorId | ||
+ | "sleep 10".! | ||
+ | return (host, | ||
+ | } | ||
+ | |||
+ | def main(args: Array[String] = Array("" | ||
+ | Logger.getLogger(" | ||
+ | val sp = new SparkContext() | ||
+ | println(sp.getConf.toDebugString) | ||
+ | val st = System.nanoTime | ||
+ | val rdd = sp.parallelize(1 to sp.defaultParallelism).cache | ||
+ | println(" | ||
+ | println(" | ||
+ | rdd.map(x=> | ||
+ | val dt = (System.nanoTime - st)/1e9d | ||
+ | println(" | ||
+ | |||
+ | sp.stop | ||
+ | } | ||
+ | } | ||
+ | |||
+ | </ | ||
+ | |||
+ | This program defines a function '' | ||
+ | |||
+ | The main program, after printing the configuration of the SparkContext, | ||
+ | |||
+ | As described in the section '' | ||
+ | |||
+ | |||
+ | For a Spark standalone cluster running three workers with two cores, and setting the number of cores for the executors to two, the following output results: | ||
+ | |||
+ | < | ||
+ | gwdu102 > spark-submit ~/ | ||
+ | 2019-07-29 17:43:42 WARN NativeCodeLoader: | ||
+ | spark.app.id=app-20190729174343-0003 | ||
+ | spark.app.name=SparkApp | ||
+ | spark.default.parallelism=6 | ||
+ | spark.driver.host=gwdu102.global.gwdg.cluster | ||
+ | spark.driver.port=46197 | ||
+ | spark.executor.cores=2 | ||
+ | spark.executor.id=driver | ||
+ | spark.executor.memory=1024M | ||
+ | spark.jars=file:/ | ||
+ | spark.master=spark:// | ||
+ | spark.submit.deployMode=client | ||
+ | |||
+ | Information from partitions: | ||
+ | host, | ||
+ | (gwdu102, | ||
+ | (gwdu102, | ||
+ | (gwdu101, | ||
+ | (gwdu101, | ||
+ | (gwdu103, | ||
+ | (gwdu103, | ||
+ | |||
+ | total time for application: | ||
+ | |||
+ | </ | ||
+ | |||
+ | The SparkContext configuration has the expected properties for the defaultParallelism (equal to 6, the total number of cores in the standalone cluster) and for the executor cores (equal to 2). The RDD collection was created with 6 elements, spread into 6 partition, so every partion contains just 1 element. Every partition of the RDD is send by the driver to an executor for performing the action on the partitions elements as defined in the function given as argument to the map method. The output shows, that three executors have been working (executor ids 0, 1, 2), each executor on a different host, and every executor acting on 2 partitions. Since the total core number for running the application is 6, the computation for every partition is scheduled to a different core and all 6 computations can start nearly simultaneously, | ||
+ | |||
+ | ==== Running Spark Applications on GWDG's Scientific Compute Cluster | ||
+ | |||
+ | Jobs on GWDG's compute cluster are managed by the SLURM workload manager as described in the section [[https:// | ||
+ | |||
+ | A Spark standalone cluster can be started on nodes of the compute cluster by submitting the SLURM '' | ||
+ | |||
+ | An example is the following '' | ||
+ | |||
+ | < | ||
+ | #!/bin/bash | ||
+ | |||
+ | #SBATCH --partition=medium | ||
+ | #SBATCH --ntasks=4 | ||
+ | #SBATCH --cpus-per-task=2 | ||
+ | #SBATCH --mem-per-cpu=2048 | ||
+ | #SBATCH --time=60: | ||
+ | #SBATCH --output=outfile-%J | ||
+ | |||
+ | . setenv.sh | ||
+ | $SPARK_HOME/ | ||
+ | . wait-worker.sh | ||
+ | |||
+ | sleep infinity | ||
+ | </ | ||
+ | |||
+ | Submitting this script from the login host with the command '' | ||
+ | |||
+ | The commands in the batch script start with sourcing the script '' | ||
+ | < | ||
+ | ## | ||
+ | module load JAVA/ | ||
+ | |||
+ | export SPARK_CONF_DIR=~/ | ||
+ | mkdir -p $SPARK_CONF_DIR | ||
+ | |||
+ | env=$SPARK_CONF_DIR/ | ||
+ | echo " | ||
+ | echo " | ||
+ | echo " | ||
+ | echo ' | ||
+ | echo ' | ||
+ | |||
+ | echo " | ||
+ | echo " | ||
+ | echo " | ||
+ | |||
+ | scontrol show hostname $SLURM_JOB_NODELIST > $SPARK_CONF_DIR/ | ||
+ | |||
+ | conf=$SPARK_CONF_DIR/ | ||
+ | echo " | ||
+ | echo " | ||
+ | echo " | ||
+ | echo " | ||
+ | echo " | ||
+ | </ | ||
+ | |||
+ | The role of the commands in this file have been explained in detail in the two previous sections. The number of cores for a Spark standalone worker is determined dynamically by the command '' | ||
+ | |||
+ | The nodes on which workers are to be started are listed in a special format in the environment variable '' | ||
+ | |||
+ | In the last group of commands the properties for the SparkContext are set, which are needed for running the Spark application on the started standalone cluster. In particular, the '' | ||
+ | |||
+ | After preparing the environment for the standalone cluster, the next command '' | ||
+ | |||
+ | The '' | ||
+ | |||
+ | < | ||
+ | ## | ||
+ | . $SPARK_CONF_DIR/ | ||
+ | num_workers=`cat $SPARK_CONF_DIR/ | ||
+ | echo number of workers to be registered: $num_workers | ||
+ | master_logfile=`ls -tr ${SPARK_LOG_DIR}/ | ||
+ | worker_logfiles=`ls -tr ${SPARK_LOG_DIR}/ | ||
+ | steptime=3 | ||
+ | for i in {1..100} | ||
+ | do | ||
+ | sleep $steptime | ||
+ | num_reg=` grep ' | ||
+ | if [ $num_reg -eq $num_workers ] | ||
+ | then | ||
+ | break | ||
+ | fi | ||
+ | done | ||
+ | echo registered workers after $((i * steptime)) seconds | ||
+ | for file in $worker_logfiles | ||
+ | do | ||
+ | grep ' | ||
+ | done | ||
+ | grep ' | ||
+ | grep ' | ||
+ | </ | ||
+ | |||
+ | The output of the '' | ||
+ | |||
+ | < | ||
+ | number of workers to be registered: 3 | ||
+ | registered workers after 114 seconds : | ||
+ | 19/08/11 12:26:25 INFO Worker: Successfully registered with master spark:// | ||
+ | 19/08/11 12:27:07 INFO Worker: Successfully registered with master spark:// | ||
+ | 19/08/11 12:26:29 INFO Worker: Successfully registered with master spark:// | ||
+ | 19/08/11 12:26:24 INFO Master: Starting Spark master at spark:// | ||
+ | 19/08/11 12:26:25 INFO Master: Registering worker 10.108.102.34: | ||
+ | 19/08/11 12:26:29 INFO Master: Registering worker 10.108.102.46: | ||
+ | 19/08/11 12:27:07 INFO Master: Registering worker 10.108.102.47: | ||
+ | </ | ||
+ | |||
+ | in the batch output file, the Spark cluster is ready for executing Spark applications. | ||
+ | |||
+ | Now Spark applications can be submitted from the same login node, from which the sbatch script had been submitted. All necessary setting of the SparkContext properties have been placed into the '' | ||
+ | |||
+ | < | ||
+ | gwdu102 > module load JAVA spark | ||
+ | gwdu102 > SPARK_CONF_DIR=~/ | ||
+ | gwdu102 > spark-submit | ||
+ | 19/08/11 12:41:52 WARN NativeCodeLoader: | ||
+ | Using Spark' | ||
+ | spark.app.id=app-20190811124153-0000 | ||
+ | spark.app.name=SparkApp | ||
+ | spark.default.parallelism=8 | ||
+ | spark.driver.host=gwdu102.global.gwdg.cluster | ||
+ | spark.driver.port=43083 | ||
+ | spark.executor.cores=2 | ||
+ | spark.executor.id=driver | ||
+ | spark.executor.memory=4096M | ||
+ | spark.jars=file:/ | ||
+ | spark.master=spark:// | ||
+ | spark.submit.deployMode=client | ||
+ | |||
+ | Information from partitions: | ||
+ | host, | ||
+ | (gwdd034, | ||
+ | (gwdd034, | ||
+ | (gwdd047, | ||
+ | (gwdd047, | ||
+ | (gwdd034, | ||
+ | (gwdd034, | ||
+ | (gwdd046, | ||
+ | (gwdd046, | ||
+ | |||
+ | total time for application: | ||
+ | </ | ||
+ | |||
+ | For executig the Spark application as a batch job, the batch sript '' | ||
+ | by replacing the command '' | ||
+ | |||
+ | < | ||
+ | spark-submit ~/ | ||
+ | |||
+ | $SPARK_HOME/ | ||
+ | </ | ||
+ | |||
+ | [[Kategorie: |
wiki/hpc/parallel_processing_with_spark_on_gwdg_s_scientific_compute_cluster.txt · Last modified: 2019/08/11 15:34 by 127.0.0.1