HPC Week1 Samp
HPC Week1 Samp
Data Analytics
Week 1:
This course introduces students to deep learning and convolutional neural networks, and presents an overview of
systems for large-scale computing and big data.
Course schedule
Office hours
There are no set office hours during the week, however you can use the moodle forums to ask questions on the forum
for each week.
We will also be running live sessions and drop in question-and-answer sessions thoughout the course. Details will be
provided on moodle.
Please take advantage of these opportunities to discuss with us any issues you might have about any aspect of the
course.
Aims
2
• explain the differences between SQL and NoSQL databases and assess their suitability in different real-life set-
tings; and
• explain the basic concepts underpinning big data systems such as Spark or Hadoop and discuss their suitability
and use in different scenarios.
Assessment
Books
TensorFlow
There are many books available for learning TensorFlow and the university library has several that are available as
ebooks. If you’re considering purchasing a book on TensorFlow, be aware that it is evolving fast and many of the code
snippets in the books are quickly out-of-date.
Pro Deep Learning with TensorFlow: A Mathematical Approach to Advanced Artificial Intelligence in
Python
https://www.apress.com/gb/book/9781484230954
This is a good introduction to some of the mathematical concepts that underpin TensorFlow and it is available from
the library as an ebook.
3
Large-scale computing
This course is focusing on large-scale computing and big data. The first eight weeks will focus on large-scale computing
and TensorFlow, a library for deep learning. The remaining two weeks will focus on big data. This week will give a
short overview of large-scale computing, explain some of the underpinning concepts, and introduce MapReduce, a
programming model for working with big data sets.
Background
With the advent of computer-aided decision making and machine learning as well as the increasingly fast process of
digitisation the demand for computing power has increased over the last decade and keeps increasing.
Historically, systems with high hardware specifications and parallel computing capabilities, known as supercomputers,
have been used for meeting the growing demand for computing and storage resources. Supercomputers offer high
performance computing (HPC) services, but they incur high costs of acquisition and maintenance.
In the meantime, computer hardware has become more powerful and affordable nowadays. As a result, organisations
that cannot afford supercomputers can set up less costly computer clusters to meet their computing requirements.
Even if supercomputers are available, routine computation can be scheduled on computer clusters and only jobs with
high performance requirements tend to be sent to supercomputers.
However, even though being much less costly than supercomputers, setting up a compute cluster requires a significant
investment. Nowadays, users who do not have local access to supercomputers and compute clusters can make use of
different types of large-scale computing technologies. Grid computing and cloud computing are common examples of
such technologies for carrying out computations at large scale.
Grid computing is the result of academic research in the 1990s. A grid brings together resources from different domains
under a shared computing environment. An example of a project that made use of such distributed computing is
SETI@Home, which started in the late 1990s and allowed users to make their computer available to search radio
signals for signs of extraterrestrial intelligence.
Cloud computing
A decade later cloud computing started to become increasingly popular. A large number of commercial providers began
offering paid-for on-demand access to distributed resources for data storage and computing. Put another way, cloud
computing is nothing other than renting compute power and storage from someone else’s data centre.
Due to the providers’ economies of scale cloud computing can provide a cheaper way of accessing data storage and
computing power.
There are several cloud computing platforms, such as
• Amazon Web Services (AWS)
• Google Cloud
• IBM Cloud
• iland by VMWare
• Microsoft Azure
• Qubole, founded by former big data employees of Facebook
• Rackspace
• Service Cloud by Salesforce
Cloud computing is based on the notion of virtualisation. Users of cloud computing run virtual machines. A virtual
machine (VM) is an emulation of a physical computer.
Features and advantages of cloud computing One of the main characteristics of cloud computing platforms is that
they provide the flexibility to scale resources. This is possible because the “computer” a user gets access to is virtual.
Scaling refers to adding or reducing computing power or memory or data storage or network bandwidth to regulate
performance. Scaling helps to achieve higher performance in two possibly ways, by scaling up or by scaling out.
• Scaling up, known also as vertical scaling, means to increase allocation of resources (computing power, memory,
data storage or network bandwidth) on an existing VM. For instance, more memory can be added to a VM to
make a database server run faster.
4
• Scaling out, also known as horizontal scaling, means to increase allocation of resources by adding more VMs to
an existing application. For instance, several new VMs with identical configuration can be created to distribute
workload across them.
It is possible to reserve resources temporarily for a demanding task and then release them, scaling in or scaling down
the application. The flexibility to scale cloud resources according to ongoing needs lends the term elastic cloud to cloud
computing platforms.
Cloud computing offers a number of advantages and utilities:
• It is a managed system, since maintenance and support is made available by the cloud provider.
• It offers security due to automatic encryption and other security measures put in place by the provider.
• It enables scalability with virtually unlimited computing power and data storage.
• It is accessible remotely over HTTP or HTTPS, thus allowing more flexible access virtually from everywhere.
• It is a durable infrastructure, which allows data redundancy (same data stored in two separate locations) and data
replication (data copied multiple times across different sites, servers or databases).
Hardware considerations
The electronic circuitry that performs the basic operations a computer program undertakes is called a processor. The
processor is the engine that carries out mathematical operations, such as arithmetic, it controls logic flows and executes
input and output operations.
There are two main types of processing unit available for large scale computing. The hardware configuration can have
a large impact on the computational power both for on-site computing clusters and cloud computing.
Central processing unit (CPU). The CPU is the heart of the computer. It performs the majority of operations on a
standard PC and is optimised to perform high-precision, fast calculations. A CPU consists of a few cores (up to 32)
optimised for sequential serial processing. Many high-performance compute clusters operate by using multiple CPUs
in parallel.
Graphical processing unit (GPU). As the name suggest GPUs were originally developed for rendering graphics. They
were optimised to perform thousands of lower-precision operations in parallel and so consist of thousands of smaller
and more efficient microprocessor cores. While GPUs were originally developed for graphical operations, their archi-
tecture make them ideally suited to many parallel programming tasks and they have been used for general purpose
computing for many years. GPUs typically have a far higher number of cores than CPUs. Even consumer-grade GPUs
can have thousands of cores. Though each core might be, on its own slower, than the core of a CPU, it is their num-
ber that makes the difference. Modern GPUs can be used as general-purpose processing units that are 50–100 times
faster than CPUs for tasks that require multiple parallel processes. GPUs can be programmed directly using specialised
libraries such as CUDA while modern libraries such as TensorFlow take advantage of GPU hardware without requiring
the programming to worry about parallel GPU threads or data transfer.
5
Computational cost
Complexity is seen as a function of input sizes. Often there is just one input size (say the number of data points), and
we will refer to it as 𝑛. The key questions we are interested in are:
• How much slower will the algorithm get if we increase 𝑛, or
• How much more memory will the algorithm need if we increase 𝑛?
The complexity is typically expressed in terms of asymptotic complexity bounds, which we will look at in more detail
later on. The time complexity is typically measured in floating point operations (or FLOPS). Adding two numbers,
multiplying them or comparing them are examples of floating point operations.
𝑝
∑︁
𝐶𝑖 𝑗 = 𝐴𝑖𝑘 𝐵 𝑘 𝑗 = 𝐴𝑖1 𝐵1 𝑗 + . . . + 𝐴𝑖 𝑝 𝐵 𝑝 𝑗
𝑘=1
To calculate each entry of C we need to perform 𝑝 multiplications and 𝑝 additions (strictly speaking 𝑝 − 1
additions, but coding it up as 𝑝 additions is easier, see below). So each entry requires 2𝑝 floating point
operations.
.. .. .. .. .. .. .. .. .. .. .. .. .. .. ..
. . . . . . . . . . . . . . .
Ci1 ··· Cij ··· Cin = Ai1 ··· Aik ··· Aip · Bk1 ··· Bkj ··· Bkp
.. .. .. .. .. .. .. .. .. .. .. .. .. .. ..
. . . . . . . . . . . . . . .
Cm1 ··· Cmj ··· Cmn Am1 ··· Amk ··· Amp Bp1 ··· Bpj ··· Bpn
m = 10 # Set up dimensions
n = 8
p = 9
A = np.random.normal(0, 1, (m,p)) # Simulate matrices
B = np.random.normal(0, 1, (p,n))
C = np.zeros((m, n)) # Set up matrix to store result
6
If both matrices A and B are square 𝑛 × 𝑛 matrices (i.e. 𝑚 = 𝑛 = 𝑝 ), then the computational cost is 2𝑛3
floating point operations and the memory cost is 𝑛2 .
We can “verify” the computational cost by performing a small simulation in Python.
We first define a function that performs the timing for a given value of 𝑛 . . .
import numpy as np
import timeit
When we look at the computational complexity of an algorithm we are not interested in the exact number of opera-
tions, but we want to know how the computational cost increases as we increase (one of) the input dimension(s).
Consider the case of multiplying two 𝑛 × 𝑛 matrices from example 1. We have seen that we require 2𝑛3 floating point
operations. What happens if we multiply a matrix that is twice as big, i.e. a (2𝑛) × (2𝑛) matrix? We would require
2 · (2𝑛) 3 = 16𝑛3 = 8 · (2𝑛3 ) operations, i.e. 8 times as many as we required for multiplying two 𝑛 × 𝑛 matrices. Because
8 = 23 we would then say that the computational complexity of multiplying two square matrices scales in a cubic
manner and say that multiplying two square matrices is an 𝑂 (𝑛3 ) operation.
The basic idea behind the 𝑂 (·) notation (pronounced “big-oh of . . . ”) is that we place a function inside the 𝑂 (·) that
grows as least as fast the computational cost.
What might be a little surprising is that inverting an 𝑛 × 𝑛 matrix is also 𝑂 (𝑛3 ) , i.e. multiplying two matrices has the
same time complexity as inverting one of them, even though the latter appears to be much harder for us humans.
7
Formal definition of the 𝑂 (·) notation
Consider an an algorithm that requires 𝑓 (𝑛) operations. We then say that it is of complexity 𝑂 (𝑔(𝑛)) if
we can find a positive constant 𝑐 and a minimum input size 𝑛0 such that for all greater input sizes 𝑛 ≥ 𝑛0
we have that 𝑓 (𝑛) ≤ 𝑐𝑔(𝑛) , i.e. 𝑓 (𝑛) is dominated by 𝑐𝑔(𝑛) .
In other words, for an algorithm to be of 𝑂 (𝑔(𝑛)) the bound 𝑔(𝑛) must grow faster in 𝑛 than the number
of operations 𝑓 (𝑛) .
In example of multiplying two 𝑛 × 𝑛 matrices (example 1) the number of operations is 𝑓 (𝑛) = 2𝑛3 . This is
of 𝑂 (𝑛3 ) , as we can choose 𝑛0 = 1 and 𝑐 = 2, and obtain 𝑓 (𝑛) = 2𝑛3 ≤ 2𝑔(𝑛) = 𝑐𝑔(𝑛) .
Complexity O(g(n))
Scaled Bound c · g(n)
Cost f (n)
n
n0
Supplementary material:
Ω(·) and Θ(·) notation
Technically speaking the 𝑂 (·) notation only provides an upper bound, so multiplying two 𝑛 × 𝑛 matrices is also an
𝑂 (𝑛4 ) operation. Though the 𝑂 (·) is typically not used this way, we have technically not made a wrong statement.
The Θ(·) notation avoids this problem. We say that an algorithm is of complexity Θ(𝑔(𝑛)) if we can find two positive
constants 𝑐 1 and 𝑐 2 and a minimum input size 𝑛0 such that for all greater input sizes 𝑛 ≥ 𝑛0 we have that 𝑐 1 𝑔(𝑛) ≤
𝑓 (𝑛) ≤ 𝑐 2 𝑔(𝑛) , i.e. 𝑓 (𝑛) dominates 𝑐 1 𝑔(𝑛) and is dominated by 𝑐 2 𝑔(𝑛) . In other words, for an algorithm to be of
Θ(𝑔(𝑛)) the bound 𝑔(𝑛) must grow as fast in 𝑛 as the number of operations 𝑓 (𝑛) .
The reason why the the 𝑂 (·) notation is more prominent than the Θ(·) notation is that showing that an algorithm is
of complexity 𝑂 (·) is a lot easier than showing that it is of complexity Θ(·) .
Finally the Ω(·) notation provides a lower bound on the complexity. We say that an algorithm is of complexity Ω(𝑔(𝑛))
if we can find a positive constant 𝑐 and a minimum input size 𝑛0 such that for all greater input sizes 𝑛 ≥ 𝑛0 we have
that 𝑐𝑔(𝑛) < 𝑓 (𝑛) , i.e. 𝑓 (𝑛) dominates 𝑐𝑔(𝑛) . In other words, for an algorithm to be of Ω(𝑔(𝑛)) the bound 𝑔(𝑛) must
grow slower in 𝑛 than the number of operations 𝑓 (𝑛) .
Complexity Θ(g(n)) Complexity Ω(g(n))
n n
n0 n0
8
• An 𝑂 (1) bound means that an algorithm executes in constant time or space irrespective of the size 𝑛 of the input
data.
• An 𝑂 (𝑛) bound describes an algorithm whose (time or space) performance grows linearly with (in direct propor-
tion to) the size 𝑛 of the input data.
• An 𝑂 (𝑛2 ) bound refers to an algorithm whose performance grows quadratically with (in direct proportion to the
square of) the size 𝑛 of the input data.
• An 𝑂 (𝑛3 ) bound refers to an algorithm whose performance grows in a cubic manner with (in direct proportion
to the cube of) the size 𝑛 of the input data.
Finally, a problem is said to be NP-hard (non-deterministic polynomial), if we can find no algorithm which has a poly-
nomial run-time everytime, or, to be more precise, there is no algorithm for which there is a polynomial 𝑔(𝑛) such that
it would be of 𝑂 (𝑔(𝑛)) . The travelling salesman problem is an example of an NP-hard problem.
The number of operations required (and thus the run time of an algorithm) is not always deterministic: it often depends
on the data. In this case we often look at the average case complexity and the worst-case complexity.
The complexity of a task often also depends on how the data is organised, as the following simple example shows.
Example 2.
Consider a vector or list of length 𝑛 containing un-ordered numbers. In Python we could create such a
list using
l = [1, 10, 7, 9, 3, 12, 5, 2]
n = len(l)
Suppose we want to find which entry is 5. All we can do is go through the list one-by-one. What would
the time complexity of this be?
• If we are really lucky, the first entry is the one we are looking for and we are already done. So the
best case time complexity is 𝑂 (1) .
• If we are really unlucky, the last entry is the one we are looking for and we need to look at all the
entries. So the worst case time complexity is 𝑂 (𝑛) .
• Let’s finally look at the average time complexity. With probability 1𝑛 the entry we are looking for
is in the 𝑖 -th position, so we need to look on average at 𝑖=1 𝑛 = 2 values, so the
Í𝑛 Í𝑛 𝑖 𝑛+1
𝑖 𝑝(𝑖) = 𝑖=1
average case complexity is still 𝑂 (𝑛) .
1 10 7 9 3 12 5 2
We can however do better if we know that the entries of the list are sorted.
l = [1, 2, 3, 5, 7, 9, 10, 12]
n = len(l)
Rather than starting from the beginning, we could pick the entry in the middle (5 or 7, so let’s use 7) and
compare it to 5. 5 is less than 7, so 5 must come before 7, so we only need to keep what is before 7. Next
we compare 5 to 3 (the new entry in the middle), 3 is less than 5, so 5 must come after 3. There is only
one entry left, so we found the entry 5 in three steps.
1 2 3 5 7 9 10 12
What is the complexity of this algorithm? Every time we make a comparison we can discard half of the
9
remaining vector. If we are unlucky, we need to continue this until we are left with a vector of length 1.
So in terms of the number of steps required we need that 2# steps exceeds 𝑛, i.e. we would need at most
𝑂 (log2 (𝑛)) steps, which is less than the 𝑂 (𝑛) from above. So if the data is sorted we can retrieve a value
in (average case and worst case) time complexity 𝑂 (log2 (𝑛)) .
In practice, data is rarely sorted. However the same trick can be exploited by what is called indexing (and
hashing). If a data structure is indexed, then we additionally maintain a sorted tree of the data which
stores where the data is located, so that we can look up values more quickly. In Python, dictionaries and
indices of pandas data frames are indexed (hashed). In many relational database systems, columns can be
indexed for better performance.
Though indexing increases the speed of retrieving values, it requires additional storage and there is an
additional cost when values are added, changed or removed from the list.
Task 1.
Let A be an 𝑛 × 𝑛 matrix and w a vector of length 𝑛. Explain why the time complexity of multiplying Aw
is 𝑂 (𝑛2 ) and the memory complexity is 𝑂 (𝑛) .
Task 2.
Let A and B be matrices of size 𝑛 × 𝑛 and w a vector of length 𝑛. Compare the evaluations (AB)w and
A(Bw) in terms of time and space complexity.
Task 3.
What is the time and space complexity of the following code? Assume random() scales linearly.
import random
a = b = 0
for i in range(n):
a = a + random.random()
for j in range(m)
b = b + random.random()
Task 4.
Consider a linear regression model of the form
𝑌𝑖 = 𝛽0 + 𝛽1 𝑥 𝑖1 + . . . + 𝛽 𝑝 𝑥𝑖 𝑝 + 𝜀 𝑖 , 𝑖 = 1, . . . , 𝑛
ˆ = (X> X) −1 X> y,
𝜷
both in terms of the number of observations 𝑛 and the number of covariates 𝑝 ? Can you verify this
empirically in R or Python?
10
Parallelisation
Normally, the instructions in a programme are executed one-after another (in a serial way). An algorithm can run faster
if (some of) its steps are executed in parallel. However, this is not always possible. We can only run two or more steps
in parallel if they do not depend on each other.
This means that all the entries can be set at the same time in parallel. Modern linear algebra implementa-
tions exploit this and perform matrix multiplication across more than one core of the CPU.
𝑖
∑︁
𝑦𝑖 = 𝑥𝑖 .
𝑗=1
n = 1000
x = np.random.normal(0, 1, n) # Simulate data
y = np.zeros(n) # Create vectro to hold result
cumsum = 0
y[0] = x[0] # Set first entry
for i in range(1, len(x)): # Loop through remaining vector
y[i] = y[i-1] + x[i] # to calculate cumulative sum
We cannot parallelise this easily, because we need to know y[i-1] when setting y[i]. If we were to split
the loop into two, we could only start the second half once we have finished the first half, as we need to
know what value y[i-1] takes, and we will obtain that value only once have dealt with the first half.
Task 5.
How can the calculation of the mean be spread across several parallel processes? Is the same possible for
the median?
It is however worth keeping in mind that parallelism incurs an overhead. How large it is depends on the underlying
architecture. If parallelising across different cores in a single computer the main overheads stem from coordinating the
different threads. However, when parallelising across different nodes in distributed computing environment, sending
information from one node to another is typically slow.
In both cases, in order to avoid overheads it is best to parallelise big chunks of code, rather than small chunks and to
require a little exchange of information between the parallel threads as possible.
11
Supplementary material:
Parallelising Bayesian computations
If you have already taken Uncertainty Assessment and Bayesian Computation then you have already learned about
Markov-Chain Monte Carlo (MCMC) and Sequential Monte Carlo (SMC). MCMC cannot be parallelised as each step
depends on the previous step (just like the example of the cumulative sum). All we can do in MCMC is exploit
parallelism when calculating the acceptance probability. SMC on the other hand lends itself very well to parallelisation.
The particles can be propagated independently of each other in parallel and the nodes only need to communicate
when renormalising weight or resampling particles.
Data parallelism
So far, we have only looked at using parallelism to perform a computation more quickly. In data science, a more common
occurrence of parallelism is what is called data parallelism. In data parallelism, the data is spread across several nodes
(computers or servers), typically because the amount of data available is too large for a single node.
The challenge now consists of carrying out the required calculations in a way that
• performs as my operations in parallel as possible, and that
• minimises the amount of data that needs to be transferred between nodes.
Some algorithms (such as linear regression) lend themselves better to such a scenario than others (such as kernelised
support vector machines).
𝑌𝑖 = 𝛽0 + 𝛽1 𝑥 𝑖1 + . . . + 𝛽 𝑝 𝑥𝑖 𝑝 + 𝜀 𝑖 , 𝑖 = 1, . . . , 𝑛
However the first 𝑚 observations are stored on the first node and the remaining 𝑛 − 𝑚 observations are
stored on another node. Let’s also assume that the number of covariates is small compared to 𝑚 or 𝑛.
In other words, the first node holds
1 𝑥 11 ... 𝑥1 𝑝 𝑦1
X1 = ... .. .. .. , y = ...
. . .
1 𝑥 𝑚1 ... 𝑥 𝑚 𝑝 𝑦 𝑚
The second node holds
1 𝑥 𝑚+1,1 ... 𝑥 𝑚+1, 𝑝 𝑦 𝑚+1
X2 = ... .. .. .. , y = ...
. . .
1 𝑥 𝑛1 ... 𝑥 𝑛 𝑝 𝑦𝑛
We have that
X1 y1
X= , y=
X2 y2
and thus
ˆ = (X> X) −1 X> y = (X> X1 + X> X2 ) −1 (X> y1 + X> y2 )
𝜷 1 2 1 2
We can now exploit that we can calculate X> 1 X1 and X1 y1 on the first node and, in parallel, X2 X2 and
> >
X2 y2 on the second node. If we assume that the first node is the head node the second node then needs
>
to transfer X> ˆ
2 X2 and X2 y2 to the first node and the first node can then calculate 𝜷 using the formula
>
We will now look at data parallelism in practice and a specific programming model for processing data in parallel known
as MapReduce.
12
MapReduce
Introduction to MapReduce
https://youtu.be/hySPcxV1yT8
Duration: 11m55s
If we want to perform computations with the data, we need to perform these computations across a large number of
nodes as well. We have already looked at this idea of data parallelism earlier.
MapReduce is a popular programming model for performing calculations in such a scenario and is used in most big
data processing frameworks.
Motivating examples
Before we look at the details of MapReduce, let’s look at two simple examples.
60 20
50 39
Worker 2
24
n: 2
sum: 110 n: 4 30
Worker 1
sum: 113
Worker 3
32
We can calculate the mean very efficiently from the number of observations and the sum of the variable
of interest on each worker, as those two numbers contain all the information. Such a data summary that
contains all the information required is called a sufficient statistic. We will see later on that methods that
have simple sufficient statistics lend themselves very well to efficient data parallelism.
As an aside, we would need to make sure that despite each block being replicated several times (not shown
in the above figure) each block will only be processed once. Hadoop will take care of this automatically.
13
Example 7 (Calculating group-wise means).
Suppose we don’t want to just calculate the overall mean, but want to calculate the group means based
on an additional categorical variable. For example, imagine that we want to calculate the average age for
males and females separately.
In this case, we have to return for each level of the categorical variable, the number of observations having
that value and the sum of the corresponding values of the numerical variable.
‘m’, 60 ‘m’, 20
‘m’, 50 ‘f’, 39
Worker 2
‘f’, 24
nm : 2 nf : 2
summ : 110 sumf : 63 ‘m’, 30
Worker 1 nm : 1
summ : 50 Worker 3
‘f’, 32
nf : 2
sumf : 60 60 + 63
‘m’, 41
nm : 1
x̄f = = 30.75
2+2
summ : 41 41 + 110 + 50
‘f’, 28 x̄m = = 38.2
1+2+2
What we have just seen is a simple example of a MapReduce job.
(1) For an input block of data each worker produced two blocks of information:
• the number of females and the sum of their ages, and
• the number of males and the sum of their ages.
We can view this as key value pairs of the form [gender, [count, sum_age]].
So in other words, for the three blocks in the example we would “emit” following intermediate results:
• Worker 1: [['f', [2, 60]], ['m', [1, 41]]]
• Worker 2: [['m', [2, 110]]]
• Worker 3: [['f', [2, 63]], ['m', [2, 50]]]
(2) We can now group the intermediate results by the gender, giving
• Female intermediate data: [['f', [2, 60]], ['f', [2, 63]]]
• Male intermediate data: [['m', [1, 41]], ['m', [2, 110]], ['m', [2, 50]]]
(3) We can now take each of these and total up the ages and divide the total counts giving for example
an average age of 60+63
2+2 = 30.75 for females and
41+100+50
1+2+2 = 38.2.
In the MapReduce paradigm we would refer to (1) as the map step, to (2) as the shuffle step, and to (3) as
the reduce step.
Map worker 1
Reduce worker 1
[‘f‘, [2, 60]]
[‘m’, [‘f‘, [2, 60]]
[‘f’, 32] [‘f’, 28]
41] [‘f‘, 30.75]
[‘m‘, [1, 41]]
[‘f‘, [2, 63]]
Map worker 2
Reduce worker 2 Result
[‘m’, [‘m’,
Data [‘m‘, [2, 110]]
60] 50] [‘m‘, [1, 41]]
Map worker 3
[‘m‘, [2, 110]] [‘m‘, 38.2]
[‘f‘, [2, 63]]
[‘m’, [‘m’,
[‘f’, 39] [‘f’, 24]
20] 30] [‘m‘, [2, 50]]
[‘m‘, [2, 50]]
14
The MapReduce paradigm
The MapReduce paradigm consist of applying the steps we have just looked at in example 7.
• The data to be processed is distributed across the cluster to a number of so-called map workers. This might
involve moving data between nodes.
• Each map worker reads in the data assigned to it and issues a number of key-value pairs.
• In the shuffle step, the key-value pairs are ordered by the key and all the key-value pairs for one key will be
moved to the same node.
• Each of these nodes, called reduce workers, will read in all the key-value pairs for one key, aggregate them and
produce an output object.
• These output objects are then collated.
When programming a MapReduce-based algorithm we would need to only implement the map and the reduce step,
as these are problem-specific. Hadoop will take care of the remaining steps. Hadoop is written in Java, so native
MapReduce jobs in Hadoop need to be coded in Java (or in Pig Latin when using Apache Pig).
Though the MapReduce paradigm is very general and many computations can be rewritten as a MapReduce operation,
these might be very inefficient. The smaller the number of key-value pairs emitted in the map step, the less data has
to be moved in the shuffle step and the quicker the reduce step will be. So if there low-dimensional summary statistics
that contain all the relevant information contained in a block of the data, it is typically best if the map step emits these.
However for a given problem there are often many different ways how that problem can be cast into the MapReduce
framework. In practical applications one would need to trade off the effort required to implement different map steps
against the computational efficiency of that approach.
[‘m’, 20]
[‘m’, 20]
In example 7 the map step was computationally more complex. In this implementation the map step is
much simpler as it only produces labelled observations of the age tagged with the gender. However, this
implementation is not efficient at all. We emit many more key-value pairs which will then have to be
moved across the network between nodes in the shuffle step, making this implementation very slow.
Though example 8 did not yield a very efficient implementation it showcased a strategy of how many algorithms can
15
be cast into the MapReduce framework. We will employ a similar trick to tackle matrix multiplication.
𝑝
∑︁
𝐶𝑖 𝑗 = 𝐴𝑖𝑘 𝐵 𝑗 𝑘 = 𝐴𝑖1 𝐵1 𝑗 + . . . + 𝐴𝑖 𝑝 𝐵 𝑝 𝑗
𝑘=1
So this reduce worker needs to obtain the i-th row of A, i.e. 𝐴𝑖1 to 𝐴𝑖 𝑝 and the 𝑗 -th column of B, i.e. 𝐵1 𝑗
to 𝐵 𝑝 𝑗 . These need to be emitted by the map workers. The key-value pairs need to use (𝑖, 𝑗) as the key
to make sure the data will be received by the reduce worker computing the (𝑖, 𝑗) -th entry of C.
Each key-value pair emitted needs to contain the corresponding entry from the matrix A or B. We need
to include an additional label in the value, so that we know how to use it in the calculation. That label will
not be used in the shuffle step, hence we don’t include in the key.
For example, we have to make sure that we use 𝐴𝑖1 in the first term, i.e. multiply it with 𝐵1 𝑗 and not
another 𝐵 𝑘 𝑗 , so when we emit 𝐴𝑖1 for example, we also need to include the information that it comes
from A (strictly speaking not required) and that it is meant to enter the calculation of 𝐶𝑖, 𝑗 in the first term
of the sum.
To summarise, for the calculation of 𝐶 (𝑖, 𝑗) we need to emit each 𝐴𝑖𝑘 ( 𝑘 = 1, . . . , 𝑝 ) as
[[i,j], ['A', k, A_ik]
where [i,j] is the key and ['A',k,A_ik] is the value.
𝐴𝑖𝑘 will not only be used to calculate 𝐶𝑖 𝑗 but also to calculate all the other entries in the 𝑖 -th row of C, so
we need to send 𝐶𝑖 𝑗 to all the 𝑛 reducers working on the 𝑖 -th row of C, calculating 𝐶𝑖1 to 𝐶𝑖𝑛 .
This means that the map workers needs to emit each 𝐴𝑖𝑘 𝑛 times as
[[i,j], ['A', k, A_ik]]
where 𝑗 ranges from 1 to 𝑛.
Similarly, the map workers need to emit each 𝐵 𝑘 𝑗 𝑚 times as
[[i,j], ['B', k, B_ik]]
where 𝑘 ranges from 1 to 𝑚 .
16
The reducers step will receive all the values required to calculate 𝐶𝑖 𝑗 , align them according to the value
of 𝑘 and calculate
𝑝
∑︁
𝐶𝑖 𝑗 = 𝐴𝑖𝑘 𝐵 𝑗 𝑘 = 𝐴𝑖1 𝐵1 𝑗 + . . . + 𝐴𝑖 𝑝 𝐵 𝑝 𝑗 .
𝑘=1
The MapReduce paradigm does not let us label key-value pairs for re-use, so we will emit a lot of redundant
data.
In fact, we will emit 2𝑚𝑛𝑝 key-value pairs, so this MapReduce approach to calculate C = AB will require
plenty of temporary storage space and might not be fast, but will allow us to multiply two large matrices,
each of which might be (much) bigger that what can be stored on a single computer.
We can address the issue of temporary storage space by splitting C into blocks and using a separate
MapReduce jobs for each block, which we can run one after the other.
𝑛
∑︁
X> X = x𝑖 x𝑖>
𝑖=1
If the data is now processed by different mappers, each mapper gets a block of the data to processed. We
can rewrite ∑︁ ∑︁
(X> X) 𝑗1 , 𝑗2 = 𝑥 𝑖 𝑗1 𝑥𝑖 𝑗2
blocks 𝑖 in block𝑙
This suggests that the 𝑙 -th mapper can calculate 𝑥 𝑖 𝑗1 𝑥𝑖 𝑗2 for 𝑗1 , 𝑗2 ∈ {1, . . . , 𝑝} and emit the
Í
𝑖 in block𝑙
entries using ( 𝑗 1 , 𝑗 2 ) as key.
Similarly, we can calculate X> y which has 𝑗 -th entry
𝑛
∑︁ ∑︁ ∑︁
>
(X y) 𝑗 = x𝑖 𝑗 𝑦 𝑖 = 𝑥𝑖 𝑗 𝑦 𝑖
𝑖=1 blocks 𝑖 in block𝑙
This suggests that the 𝑙 -th mapper for this part of the calculation can calculate 𝑖 in block𝑙 𝑥 𝑖 𝑗 𝑦 𝑖 for 𝑗 ∈
{1, . . . , 𝑝} and emit the entries using 𝑗 as key.
For both calculations the reducers would simply need to add up all the terms they are assigned as their
role is just to implement the out-most sum of the above formulae for (X> X) 𝑗1 , 𝑗2 and (X> y) 𝑗 .
If 𝑝 is moderate the remainder of the calculations, notably
𝜷ˆ = (X> X) −1 X> y
17
can be carried out on a single node. If 𝑝 is however large, then this would not be possible, and which case
ˆ would need to be spread across the cluster. Rather than using direct linear algebra
the calculation of 𝜷
ˆ it would in the large 𝑝 case be more efficient to use an indirect method such as (stochastic)
to calculate 𝜷
gradient descent.
Looked at from a statistical point of view, when implementing an inferential algorithm it is always best if the map step
produces a low-dimensional sufficient statistic that contains all the relevant information from that block of observa-
tions. This is what we have done in example 10 for linear regression.
Task 6.
Consider a large text document. Construct a MapReduce algorithm that counts how often each word
occurs in the document.
18
Answers to tasks
Answer to Task 1. Aw is a vector of length 𝑛, so we need to calculate 𝑛 elements. The 𝑖 -th element pf Aw is given by
the sum
𝑛
∑︁
𝐴𝑖 𝑗 𝑤 𝑗 = 𝐴𝑖1 𝑤 1 + . . . + 𝐴𝑖𝑛 𝑤 𝑛 .
𝑗=1
.. .. .. .. .. .. ..
. . . . . . .
vi = Ai1 ··· Aij ··· Ain · wj
.. .. .. .. .. .. ..
. . . . . . .
vn An1 ··· Anj ··· Ann wn
Each entry requires 2𝑛 floating point calculations, so calculating the entire vector requires 2𝑛2 floating point operations.
Thus the time complexity of the matrix-vector product Aw is 𝑂 (𝑛2 ) .
The matrix-vector product Aw yields a vector of length 𝑛, therefore it has space complexity 𝑂 (𝑛) .
Answer to Task 2. (AB)w requires us to compute the matrix product U = AB, which takes 𝑂 (𝑛3 ) time, and sub-
sequently the matrix-vector product Uw, which takes 𝑂 (𝑛2 ) time. So, the time cost of (AB)w is 𝑂 (𝑛3 + 𝑛2 ) or, by
excluding trailing terms (which grow slower than 𝑛3 ), 𝑂 (𝑛3 ) .
In terms of memory, we need to allocate the temporary 𝑛 × 𝑛 matrix U and the result vector w of length 𝑛. Thus the
memory complexity is 𝑂 (𝑛2 + 𝑛) or, by excluding trailing terms, 𝑂 (𝑛2 ) .
On the other hand A(Bw) requires to compute the matrix-vector product v = Bw, which runs in 𝑂 (𝑛2 ) time, and the
matrix-vector product Av, which also runs in 𝑂 (𝑛2 ) time. So, the time cost of A(Bw) is 𝑂 (2𝑛2 ) or, by ignoring scaling
factors, 𝑂 (𝑛2 ) . Thus, it is less time consuming to compute A(Bw) instead of (AB)w.
In terms of memory, we need to allocate the temporary vector v of length 𝑛 and the result vector w of length 𝑛. Thus
the memory complexity is 𝑂 (2𝑛) or, by ignoring scaling factors, 𝑂 (𝑛) .
So, both in terms of time and memory complexity the second approach is to be preferred over the first one.
19
U11 ··· U1n w1
= .. .. .. · ..
. . . .
Un1 ··· Unn wn
= .. .. .. · ..
. . . .
An1 ··· Ann vn
To demonstrate the difference in time complexity between A(Bw) and (AB)w, we can compare the run times in
Python for different values of 𝑛.
import numpy as np
import timeit
import matplotlib.pyplot as plt
plt.figure()
plt.plot(n, runtime1, 'b-o', label='First way')
plt.plot(n, runtime2, 'r-o', label='Second way')
plt.xlabel("Matrix dimension n")
plt.ylabel("Run time in seconds")
plt.legend()
plt.show()
20
Answer to Task 3. The snippet takes 𝑂 (𝑛 + 𝑚) time, as dictated by the two for loops. The space complexity of the
snippet is constant (𝑂 (1) ), since a and b are allocated once at the beginning and they do not get reallocated inside the
for loops.
Instead of the last two steps it is a little faster to solve the system of equations X> X𝜷 = X> y, though this is also
𝑂 ( 𝑝 3 ) (but with smaller constants).
Thus overall computational complexity is 𝑂 (𝑛𝑝 2 + 𝑝 3 ) . Because we need 𝑛 ≥ 𝑝+1 in order to be able to calculate unique
estimates, 𝑛𝑝 2 dominates 𝑝 3 , thus the overall computational complexity becomes 𝑂 (𝑛𝑝 2 ) , as 𝑛 needs to increase with
𝑝.
• If we only look at the effect of the number of observations 𝑛 (keeping the number of covariates 𝑝 fixed), then
the computational complexity is 𝑂 (𝑛) , i.e. linear regression scales linearly in the number of observations.
• If we only look at the effect of the number of covariates 𝑝 (keeping the number of observations 𝑛 fixed), then
the computational complexity is 𝑂 ( 𝑝 3 ) , i.e. linear regression scales in the number of covariates in a cubic way.
We can verify this empirically. We start with the imports and define three functions.
import numpy as np
import matplotlib.pyplot as plt
import timeit
21
Xty = X.transpose() @ y
return np.linalg.solve(XtX, Xty)
We can see the CPU time increases linearly as we increase the number of observations 𝑛.
We look at the effect of the number of covariates 𝑝 next.
n = 10000
p = [1, 50, 100, 250, 500, 750, 1000, 1500, 2000]
times = [ time_regression(n, one_p) for one_p in p ]
plt.figure()
plt.plot(p, times, '-o')
plt.xlabel("Number of covariates p")
plt.ylabel("Computational time (seconds)")
plt.show()
22
We can see the CPU time increases in a non-linear (cubic) way as we increase the number of covariates 𝑝 .
Answer to Task 5. In order to calculate the mean we need to sum up all the values in the vector. This sum can be
split in parts and each part carried out independently, so we can easily parallelise the calculation of the mean (though
unless we really have a lot of data (that is possibly event distributed between several nodes), there would be little point
in doing so, as calculating the mean is quick).
The calculation of the median can however not be easily parallelised. Unless the data is distributed there is little point
in trying to calculate the median in parallel.
Answer to Task 6. For each block of text, the map-worker counts for each word in the block how often it occurs
and emits key-value pairs in the form [word, n_occurences]. The reducer will then simply total up the number of
occurrences from the different blocks.
Alternatively, we would simply issue each word in the document as a key-value pair of the form [word, 1]. This
would require less effort in the implementation, but would yield a less efficient algorithm.
23