Igor Bobriakov
31 Oct 2018
•
4 min read
Spark’s primary abstraction is a distributed collection of items called a Resilient Distributed Dataset (RDD). It is a fault-tolerant collection of elements which allows parallel operations upon itself. RDDs can be created from Hadoop InputFormats (such as HDFS files) or by transforming other RDDs.
Spark provides two ways to create RDDs: loading an external dataset and parallelizing a collection in your driver program.
The simplest way to create RDDs is to take an existing collection in your program and pass it to SparkContext’s parallelize()
method.
For example, in different programming languages it will look like this:
Scala
val input = sc.parallelize(List(1, 2, 3, 4))
Python
numbers = sc.parallelize([1, 2, 3, 4])
Java
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4))
The other way is to read from a file:
Scala
val lines = sc.textFile("README.md")
Python
lines = sc.textFile("README.md")
Java
JavaRDD<String> lines = sc.textFile("README.md")
RDDs support two types of operations: transformations and actions. Transformations are operations on RDDs, which create a new RDD from existing one. Actions return a result to the driver program. All transformations in Spark are lazy. This means, they do not compute their result right away, they just remember all the transformations applied to the base dataset (or a file). Transformations are only computed when an action requires a result to be returned to driver program, or written to the storage.
Let’s create an RDD vector and do some transformations with it. We will be using Pyspark for this example.
Small tip: if you want to suppress the Spark logging output, do the following:
Pyspark
sc.setLogLevel("ERROR")
Pyspark
num = sc.parallelize([4, 6, 6, 1, 3, 0, 2, 2, 2])
The map(function)
transformation returns a new RDD, applying a function to each element of the original one.
Pyspark
result = num.map(lambda x: x**2)
For now, Spark has only remembered the transformations. To get the actual result we need to use an action. Like take()
, which take the specified number of element from the RDD.
Pyspark
result.take(10)
[16, 36, 36, 1, 9, 0, 4, 4, 4]
The filter(function)
transformation returns a new RDD, retaining only those, for which function is evaluated to true.
Pyspark
result = num.filter(lambda x: x >= 3)
result.take(10)
[4, 6, 6, 3]
The distinct()
transformation returns a new RDD, removing all the duplicates from the original dataset.
Pyspark
result = num.distinct()
result.take(10)
[0, 1, 2, 3, 4, 6]
In case we have two RDDs, we can do some transformations to them too. Let's create a new RDD:
Python
num2 = sc.parallelize([5, 5, 8, 2, 2, 1, 7, 3, 3])
The union(other)
transformation returns a new dataset, which contains all elements from both RDDs.
Pyspark
result = num.union(num2)
result.take(20)
[4, 6, 6, 1, 3, 0, 2, 2, 2, 5, 5, 8, 2, 2, 1, 7, 3, 3]
An intersection(other)
returns a dataset, which contains only elements found in both RDDs.
Pyspark
result = num.intersection(num2)
result.take(20)
[2, 1, 3]
The subtract(other)
transformation removes all contents of the other RDD.
Pyspark
result = num.subtract(num2)
result.take(20)
[0, 4, 6, 6]
We can also compute a Cartesian product of two datasets. The cartesian(other)
transformation returns a dataset of all pairs (a, b), where a belongs to original dataset, and b to other.
Pyspark
result = num.cartesian(num2)
result.take(20)
[(4, 5), (4, 5), (4, 8), (4, 2), (4, 2), (4, 1), (4, 7), (4, 3), (4, 3), (6, 5), (6,5), (6, 8), (6, 2), (6, 2), (6, 1), (6, 7), (6, 3), (6, 3), (6, 5), (6, 5)]
As we’ve mentioned earlier, actions return some value. For example, we can count elements in the dataset using the simple command:
Pyspark
num.count()
9
Count occurrences of elements in RDD. This action returns a dictionary of (value
, count
) elements.
Pyspark
num.collect()
[4, 6, 6, 1, 3, 0, 2, 2, 2]
top
returns a number of top elements from the RDD
Pyspark
num.top(3)
[6, 6, 4]
takeOrdered
returns a number of elements in ascending order
Pyspark
num.takeOrdered(5)
[0, 1, 2, 2, 2]
The most common action upon RDD is reduce(function)
, which takes a function operating on two elements from RDD returning one element of the same type.
Pyspark
num.reduce(lambda x, y: x + y)
[26]
Now, let's take a look at the fold()
action, which is similar to reduce()
and acts pretty much the same, but allows to take the zero value for the initial call.
Pyspark
num.fold(0, lambda x,y : x + y)
[26]
An aggregate()
function frees us from the constraint of having the return be the same type as the RDD we are working on. Let's take a closer look at this function and walk through the simple example step by step:
Pyspark
num = sc.parallelize([4, 6, 6, 1, 3, 1, 2, 2, 2])
sumCount = num.aggregate((1, 0),
(lambda tup, value: (value * tup[0], tup[1] + 1),
(lambda tup, value_tup: (value_tup[0] * tup[0], value_tup[1] + tup[1])))
sumCount
(3456, 9)
(1,0) is a starting value, here it is a tuple which we are going to use. First lambda()
function takes tuple and one value as an input, the second function in its turn, takes two tuples as an input.
Spark provides special operations on RDDs containing key-value pairs. These RDDs are called pair RDDs. Pair RDDs are a useful building block in many programs, as they expose operations that allow you to act on each key in parallel or regroup data across the network.
RDDs of key-value pairs also support some operations like RDDs. That’s the topic of our next blog post.
In this second article in the line of tutorials about working with Apache Spark, we’ve guided you through the Apache Spark's RDD which is its primary abstraction. Use RDD programming guide to learn more about commands and operations you can use.
In the next article, we will talk about Data Frames in Apache Spark.
Ground Floor, Verse Building, 18 Brunswick Place, London, N1 6DZ
108 E 16th Street, New York, NY 10003
Join over 111,000 others and get access to exclusive content, job opportunities and more!