01-02. Introduction
Course
James Atlas. Office hours: Monday 1-4 pm
Groups of 2-4. Three roles:
- Manager - ensure group on-task
- Recorder - records answers/discussions
- Reporter - summarizes findings to instructor/other teams
Weighting:
- Labs: 30% (Learn quiz + 9 assignments)
- Project: 40%
- Final exam: 30%
The Five Challenges
- Variety: different formats of data from many sources (e.g. country-specific regulations, language barriers)
- Velocity: how quickly the data can be ingested and processed
- Volume: large volume of data
- Veracity: inconsistencies/uncertainty in data
- Value: extracting useful information from the data
Scalability (volume, velocity): distributing computation/storage
Reliability (veracity): ensuring results survive when machines fail
Productivity (variety, value): making writing distributed programs easy
Three Perspectives
Look at the problem from three levels of abstractions:
- Architecture
- Algorithms
- Programming
Architecture
Scalability:
- Datasets do not fit in memory
- Time consuming to process large datasets
- Parallel processing/storage required
Storage/IO: ensuring data durability/consistency (RAID, DFS)
Distributed file system:
- Chuck servers:
- Files split into contiguous chunks (16-64 MB)
- Chunks replicated 2-3 times, hopefully on different racks
- Master node:
- Called ‘Name Node’ in Hadoop
- Metadata about where files are stored
- Client library that accesses the data
- Operations should run on the machines
RDD: Resilient Distributed Dataset
Parallelism vs Scalability
Parallelism:
Scalability: given a fixed problem size, does adding more processors make it faster?
03. Scalability, Programming, and Algorithms in Spark
A single node cannot process large data sets (e.g. just saving the internet to disk would take several months under the most ideal conditions). Hence, large-scale computing uses multiple nodes; a few dozen nodes in a rack connected together by a switch, with racks connected in with each other by another switch (often with higher bandwidth).
The volumes of data are too large to store on a single node, so data is stored on a distributed file system where data is kept in contiguous chunks (16-64 MB) and replicated 2-3 times. One or more master nodes store metadata relating to the chunks.
The architecture assumes nodes will fail and can continue to function when this occurs.
Networking is slow and hence, the processing is done directly on the chunk servers.
Map-Reduce
The Map-Reduce architecture takes care of:
- Partitioning input data
- Scheduling execution across a set of machines
- Handling machine failures
- Managing inter-machine communication
Map
Input is a set of key-value pairs, where the key is a nominal value (e.g. boolean, number).
Hence unlike dictionaries, the Map-Reduce architecture allows duplicate keys.
When map is called, every key-value pair is passed to a map lambda which outputs some number of key-value pairs (e.g. flatMap returns an array). This is called a transformation.
Reduce
(Likely already mapped) key-value pairs are given as input, and some reduction operation is performed:
groupByKey: combines all key-value pairs with the same key into an arrayreduceByKeyis used to summarize the values for a particular key- Passed two values and return the reduced value
Example: Word Counting
File (or a chunk of the file) given to each node. Then:
- Each individual node performs a map operation, reading the input file and converting it to a set of key-value pairs
(word, word_count = 1) - Reduce by key is used to collect all pairs with the same key and reduce it to the sum
The reduce step is then applied again with the sets from the other nodes until we are left with a single set.
Spark
Spark is a very optimized implementation of MapReduce which attempts to reduce the amount of disk/network writes. The trade-off for this is that if a catastrophic failure occurs, more work is lost.
Spark lazily computes values: mapping, reducing etc. are methods of an RDD which return an RDD (allowing chaining), not the results themselves. Hence, intermediate results are not stored and the operations are done only when you actually call collect() or a similar method.
RDD: Resilient Distributed Dataset
RDDs are a fault-tolerant collection of elements that can be operated in parallel.
They can be made by parallelizing an existing collection (e.g. list in Python) or by referencing a dataset in a external storage system (e.g. shared file system).
Action
Actions generate a local data structure at the driver:
They are operations that run on but do not mutate the RDD: count, collect, reduce, max, foreach etc.
Transformation
Returns a reference to a new RDD:
- Single RDD as input:
map,flatMap,filteretc. - Joins; multiple RDDs as input
sc.union: simply adds all key-value pairs from list of passed RDDs (duplicate keys are allowed)rdd_1.join(rdd_2): inner join, with values becoming a two-tuple- For each (key_1, value_1) in rdd_1
- For each value value_2 with key_1
- Add (key_1, (value_1, value_2) to the resultant RDD
- For each value value_2 with key_1
- For each (key_1, value_1) in rdd_1
Coordination
The master node (different from the driver) is responsible for coordination.
The node keeps track of task status; tasks can be idle (waiting for workers), in-progress or completed.
As workers become available, they get scheduled idle tasks.
When a map task is completed, the location and sizes of its intermediate files is sent to the master, which in turn sends this to workers doing reduction operations.
Failures
Master will periodically ping workers to detect failures.
If a map worker fails, in-progress/completed tasks reset to idle and must be reprocessed. Reduce workers are notified when the task is rescheduled to another map worker.
If a reduce worker fails, only in-progress tasks are reset to idle. The task is restarted on another worker.
If the master fails, the task is aborted and the client is notified.
Some Spark methods and miscellaneous notes
first(): returns the first elementtake(n): returns the firstnitemsfirstandtakeshould be applied on a sorted RDD, otherwise the results will be non-deterministic.
takeOrdered(n, sortFunction) may be used if it is not already sorted or if the RDD is already in a particular order. takeOrdered is much faster than running sort() and then take().
countByValue returns the count of each key: see the word count example. Returns a Python dictionary, not an RDD.
lookup(key) returns the value for a given key in an RDD.
Reduction functions must be associative/commutative; otherwise, the results will be wrong and non-deterministic.
Some transformations:
map: outputs exactly one key-value pairflatMap: returns a set of key-value pairs, with the contents of that array being spread to the outer setmapPartitions: return a key-value pair given the elements in a partition (in the form of an iterator)filter: lambda returning true (remove)/false (keep)
Example: Three-Word Sequences
def three_word(line):
three_word_tuples = []
words = line.split(" ")
for i in range(len(words) - 2):
words.append(" ".join([words[i], words[i + 1], words[i + 2]]))
return three_word_tuples
words = file.flatMap(lambda line: (three_word(line), 1)])
counts = words.reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], False)
counts.collect()
Map-Reduce Join
Used in operations such as group by key and joins.
During mapping, a hash function is generated on the (mapped) key. This hash is used to determine which bucket (reduce node) the key-value pair is sent to (e.g. hash % k, where k is the number of buckets).
Hence, for a given key, all map nodes will send the key to the same reduce node, allowing the reduction step to be done.
04. Algorithms and Cloud Computing
Map-Reduce Join Examples
Two Hops
Given bi-directional adjacency list, write a map-reduce algorithm that outputs nodes that are reachable in two hops for each node.
with open("graph.txt", "w") as text_file:
text_file.write("""a b
a c
a e
b c
c d
d e
e x
x y
x z
w y
y z""")
def getTuples(line):
words = line.split(" ")
graph = sc.textFile("graph.txt")\
.map(lambda line: line.split(" "))\
.flatMap(lambda t: [(t[0], t[1]), (t[1], t[0])])
two_hop_tuples = graph.join(graph) # Join to itself
# Will return results such as ('b', ('a', 'a')): a->b->a (key is the node connecting the two)
two_hop_tuples = two_hop_tuples.filter(lambda t: t[1][0] != t[1][1]) # Remove loops
two_hop_tuples = two_hop_tuples.filter(lambda t: t[1][0] < t[1][1]) # Use string sort order to remove forwards/reverse direction duplicates
initialValue = []
reduction = (lambda aggregated, el: aggregated + [el]) # adds element to the list
reduce_aggregated = (lambda agg1, agg2: agg1 + agg2) # reducing two already aggregated values into one
reduced = two_hop_tuples.aggregateByKey(initialValue, reduction, reduce_aggregated)
dbg(reduced) # Array of tuples (middle, [start, end])
# aggregateByKey gives more power than reduceByKey (and so performs slightly worse)
Refinement: Combiners
If a map produces many pairs (k, v_1), …, (k, v_n) for the same key k which will be immediately reduced (e.g. word frequency) , network time can be saved by pre-aggregating values in the mapper: combine(k, list(v_1)) -> v2.
This only works if the reduce function is commutative and associative.
Skew
The number of values associated with each key will vary significantly, so if each reduce node is given a single key as a task (which is a bad idea as there is overhead associated with each node), there will be a significant amount variation in the time each task takes to complete (even with combiners, some keys will be more popular and so require network transfer from more map nodes). This is called skew.
To reduce this issue, each reduce task should be responsible for multiple keys - if this allocation is done randomly, the skew should be reduced from averaging.
To further decrease skew, create more reduce tasks than there are compute nodes - if the task is short, then that node can pick up a few more tasks in the time it takes another node to finish one.
05. Cloud Computing, Performance - Cost and Complexity
Matrix Multiplication
Product of matrix
\text{S} = \frac{T_{\text{sequential}}}{T_{\text{parallel}}} $$
Amdahl’s Law
Some code inherently sequential; as
if
Gustafson’s Law
Strong scalability: time vs cores when problem size fixed
Weak scalability: time vs problem size/cores fixed. Sub-linear scalability if time increases.
Karp-Flatt Metric
Experimentally determining the serial fraction,
If
The lower the value of
Communication
- Communication cost: total I/O of all processes
- Count input file size
- I/O between map and reduce tasks counted twice
- Elapsed communication cost: max I/O along any path
- Partition size (number of map processes) chosen to not reach I/O bandwidth limits
- Limit amount of input/output of any one process - main memory/local disk
- Elapsed computation cost: max running time of any process
Frequent Item-sets
Market-Basket Model
Basket: small subset of items
Want to know how items are related.
Support for item-set
If the support is greater than the support threshold
Association Rules
If
Confidence:
High confidence != interesting: some items may just be frequent (e.g. ‘the’). Hence, use interest; confidence minus the probability of the item appearing in any basket.
High-interest rules (
A-Priori
If set
- Read baskets, count number of occurrences of each item
-
memory
-
- Filter items that appear less than
times - Read baskets again, finding pairs where each item appears more than
times -
memory
-
Similarity
Finding neighbors in a high-dimensional space.
Problem: given vector of data-points (possibly a flattened matrix) and a distance function
Distance Measures
Axioms:
- Never negative
- Zero if the points are the same
- Commutative:
- Triangle inequality: direct path never longer
Cosine Distance
Hence cosine distance is:
L_2 Norm/Euclidean Distance
Root of sum of squares of differences. If the vectors have
Jaccard Distance
For sets - dimensions are Boolean values.
Jaccard similarity:
Jaccard distance:
Jaccard bag similarity:
- Duplicates not removed; if element appears multiple times, count it multiple times
- Divide number of pairs in intersection by sum of sizes of the two sets
- Max similarity of
Finding Similar Documents
Shingling
A
The shingles are sets, not bags, so there should be no repeated shingles within the set of shingles.
Pick
Hashing
Out of the domain of all possible shingles, only a small subset are likely to appear.
Hence, hashing can be used to map shingles to a bucket via a hash function with a smaller range (and hence require less memory). A bucket may contain multiple shingles.
With this, a document can be represented as a Boolean vector - one Boolean for every bucket (element in the hash range), true if the document contains a shingle in the bucket.
Hence, a set of documents can be represented as a matrix, rows being buckets and columns documents.
Since most documents are unlikely to contain most shingles, store this as an sparse matrix.
Min-Hashing
If a permutation of rows is picked, the min-hash of the document is the first element that is true (e.g. if permutation is
If this is done with
This can be done to every document to get a signature matrix.
The probability of two sets having the same min-hash value is the Jaccard similarity.
Locality-Sensitive Hashing
The signature matrix can be partitioned into
For each band, group documents into the same bucket if their signatures are the same for that band.
With min-hashes, the probability of two documents sharing a bucket grows linearly with their Jaccard similarity.
With
Where
The shape of the curve can be modified by picking the values of
Note that this outputs candidate pairs and so a proper Jaccard similarity calculation should be doe to confirm their similarity.
PageRank
More in-links = page is more important.
In-links from important page = link worth more.
Page’s importance split between its out-links:
- If a page
with importance has out-links, each link gets votes
Flow Model
Start with base-model which uses the number of in-links for page importance.
Page rank,
Where
Under this current implementation there is no unique solution. Hence, a constraint is added: the sum of page ranks sums to
Matrix Formulation
Matrix
The rank vector
The importance of all pages sum to
At time
At time
Hence, for the entire rank vector:
If enough iterations are done and the Markov chain is strongly connected (every state accessible from any state), the stationary distribution will be unique and be reached regardless of the initial probability distribution.
Random Walk Interpretation
Model it as having a ‘surfer’ on some page
Teleports
Dead ends: some pages have no out-links. Random walker gets stuck and importance of page increases.
Spider-traps: all out-links within a group; all importance eventually absorbed by the trap.
Solution to both solutions (and to graph not being strongly connected): teleport to a random page with probability
This leads us to the actual equations.
If
The first term is from the page the walker is currently on teleporting (with probability
If
PageRank can then be applied iteratively using:
Performance
The matrix, if encoded as sparse matrix, will fit on disk but not memory.
Store a list of page IDs, the out-degree of the page, and the IDs of out-links from that page.
Assume the vector does fit in memory.
- Initialize all entries of
to . - For each page
: - For each out-link
: -
- For each out-link
Lots of random writes.
Topic-Specific PageRank
Teleports only go to page relevant to a topic - this leads to a different vector
Spam
Issue 1: term spam - add relevant keywords to the page (visible only to the search engine), and/or copy text from top results.
Solution 1: statistical text analysis, duplicate page detection.
Solution 2: use text content of the in-links and surrounding text - trust what others say about you, know what you say.
Issue 1: the internet banding together for jokes.
Issue 2: spam farms - spammer link to target page from popular websites they have some access to (e.g. forums) and from sites they fully control (which the target page links to).
Solution 1: detecting and blacklisting spam farm-like structures.
Solution 2: TrustRank - topic-specific PageRank with a manually-selected set of trusted pages. Uses approximate isolation - trusted sites unlikely to link to untrustworthy sites.
Recommendation Systems
Utility matrix - rows are users, columns are things they have ‘rated’ (e.g. movies).
Need to:
- Gather known ratings
- Extrapolate unknown ratings
- Evaluate performance
Content-Based Recommendations
Recommend items similar to what the customer has previously liked.
Collaborative Filtering
Find a set of users
Issues:
- Cold start: need lots of users
- Sparsity: content catalog may be large and hence have few users that have rated the same items
- First rater: need ratings for the item before it can be recommended
- Popularity bias: tends to recommend popular items
Community Detection
Girvan-Newman
Edge betweenness:
- Number of shortest paths going through an edge
- If two nodes have
shortest paths, each is counted as
Girvan-Newman decomposes undirected, unweighted networks into a hierarchical network using the strength of weak ties - if the edge betweenness is large, it is probably at the border between communities and hence should be removed. The order in which the network is split determines the hierarchy.
Part 1
Pick starting node
Parent nodes are nodes one hop closer to
Part 2
Betweenness can be calculated by working up the tree from the leaf nodes:
- Each leaf has a node flow of
- Each node splits its node flow among its parents - the edge betweenness:
- Edge betweenness is proportional to the number of shortest paths the parent has
- Each non-leaf node has a flow of
plus the sum of edge betweenness from edges to its children
This process has a complexity of
This can be repeated with every starting node, summing betweenness values and then dividing by
A random subset of starting nodes can be used to get an approximation.
The edge with the greatest betweenness is removed, and the process is repeated until there are no edges left.
Network Communities
Girvan-Newman decomposes the network into clusters, but how many clusters should there be?
Community: sets of tightly connected nodes.
Modularity
Modularity,
Where
A positive value means the number of edges in
In nice cases, the modularity will have a maxima.
K-means
If there are no predefined or direct connections (e.g. helicopters can fly to anywhere) in some high-dimensional space, clustering can be done using some distance metric.
Instead of hierarchical clustering (like with Girvan-Newman which has a top-down/divisive approach) we can use point assignment; points belong to the nearest clusters.
Pick some number of clusters
For each element, place it in the cluster whose centroid it is nearest too, then update the centroid using the average point of all elements in the centroid.
Repeat until it converges.
Online Algorithms
Algorithm given to input one piece at a time and must make irrevocable decisions.
Bipartite Matching
Two sets of vertices (e.g. advertisements and queries), elements from one set can only be paired with a subset of the other, and elements can only be picked once.
Problem: match each element from one set to an eligible element from the other set.
Competitive ratio: worst performance over all possible inputs.
AdWords
Performance-based advertising; charged only if ad clicked.
Use expected revenue per click: bid times the click-through rate.
Advertisers have a daily budget, and the CTR is calculated from historical data - exploration vs exploitation; show a new ad to estimate CTR or show ad with good known CTR?
Assume one ad/query, all advertisers have the same budget
Greedy:
BALANCE: pick advertiser with the largest unspent budget, breaking ties via some deterministic way.
Parallel Computing
Clock speeds haven’t gone up significantly in years; better performance requires optimizations by the program.
Types of Parallelism
Data Dependence Graph: directed acyclic graph, vertices being tasks and edges dependencies.
Critical path: slowest path.
If two tasks are independent of each other, they can be done in parallel.
Data Parallelism: same operation executed on different elements simultaneously (or out-of-order).
Functional parallelism: independent tasks can apply different operations to different data elements simultaneously (or out-of-order).
Pipelining: dividing process into stages to produce items simultaneously.
Instruction-Level Parallelism
Dependencies: if two instructions data-dependent, they cannot be executed simultaneously.
Hazard and stall may or may not occur, depending on pipeline organization.
Data forwarding: store instruction in register if another instruction will use it soon.
Out-of-order execution complicated as some instructions take more clock cycles to complete (e.g. FP).
Out-of-order pipeline: pool up instructions, schedule them, execute them out-of-order, then commit in-order.
Approx. one order of magnitude improvement from these tricks.
Flynn’s Taxonomy:
Instructions and data:
- SISD: standard programming
- SIMD: vector instructions, GPUs
- MISD: 😢 - none
- MIMD: multi-core processors
Memory hierarchy: registers, L1/L2/L3 cache, RAM, secondary/permanent storage. Trade-off between size and speed will always exist as it will be limited by physical distance to the CPU.
Threads and Processes
Memory Model:
- Distributed memory model: each process has its own separate area of memory, explicit data transfer required
- Shared memory model: implicit data sharing, own private area
Multi-threaded process: shared code + data, kernel context, each thread has its own stack and context (registers etc.).
Concurrency can be simulated via time slicing. Multi-core processors can have true concurrency.
Two threads logically concurrent if their flows overlap in time - life of one thread (in terms of time) overlaps with another.
Threads and processes:
- Context switching requires register values to be swapped; ~1K cycles for both threads and processes
- Creating and destroying threads cheaper than with processes
- Sharing data (too) easy with threads
Memory consistency with threads: the only constraint is that instructions within your own thread appear to run in-order - no guarantee of sequential consistency with other threads.
Hyperthreading - replicates enough instruction-control hardware to process multiple instruction streams (but not the functional units - ALU etc.).
Vectorization: applying the same operation to multiple pieces of data
Roofline Model:
- Identifies if compute or memory/communication bound
- TODO how
- More communication = shared memory/threads
- More computation = distributed memory/processes, vectorized operations
Python MPI
Multiple processes - distributed memory model.
Problem Decomposition
Large data sets split into small tasks; assume there needs to be communication between neighboring regions.
Blocks: each task given vertical slice of height
Cyclic: each task given
Partitioning can also be done in multiple dimensions (e.g. sliced vertically and horizontally).
Can also use domain decomposition: master task responsible for decomposition/assignment may decompose areas that are more expensive or require high accuracy further.
MPI Methods
from mpi4py import MPI
import numpy
comm = MPI.COMM_WORLD
size = comm.Get_size() # number of processes connected to the world
rank = comm.Get_rank() # process identifier. Starts at 0
data_to_send = numpy.zeros(1) # data must map to a byte buffer
var_to_overwrite = numpy.zeros(4) # receive buffer can be bigger
comm.Send(data_to_send , dest=process_to_send_to)
comm.Recv(var_to_overwrite, dest=process_to_receive_to)
# Non-blocking
req = comm.Isend(data_to_send, dest=RANK)
req = comm.Irecv(var_to_overwrite, source=RANK)
req.Wait()
# Receiving from any source
# use `status.source` to get source process rank
status = MPI.Status()
comm.Recv(var_to_overwrite, source=MPI.ANY_SOURCE, status=status)
# Use probe to modify variable to write to depending on source rank
comm.Probe(source=MPI.ANY_SOURCE, status=status)
# Collective operations
# Force processes to wait until all processes reach this point
comm.Barrier()
# For each element of the array, process 0 gets max value out of any process
comm.Reduce(in_arr, out_arr, op=MPI.MAX, root=0)
# Sends message from root to every other process
comm.Bcast(arr, root=0)
# Distribute chunks of the array from root process to all processes
comm.Scatter(in_arr, out_arr, root=0)
# Get chunks of arrays from every process and sends them to root
comm.Gather(in_arr, out_arr, root=0)
Deadlocks
If a process is waiting for a message it will never receive, deadlock results.
e.g. if all processes need to send and receive data, send call will block as it is waiting for the receiver to make a receive call, but all are trying to send.
Can also be caused by memory buffer filling up, causing the send function to wait.
Solution: paired send-and-receive:
if rank % 2:
comm.Send(..., rank - 1)
comm.Recv(..., rank - 1)
else:
comm.Recv(..., rank + 1)
comm.Send(..., rank + 1)
CUDA
SM: Simultaneous Multi-processor.
The entire program is a grid made up blocks: independent subtasks that can run in any order.
Global memory can be accessed by any block and each block has a single control logic unit.
Threads within a block can access shared memory and run the same instructions on a different bit of data.
‘Local memory’ is private to a thread, and is actually global memory. If too many registers are used it swaps to local memory.
syncthreads() acts as a synchronization barrier for threads in a block.
Warps
Each block divided into 32-thread warps. At any one time, one warp (not per-block!) is running, with all threads running the instructions in lock-step (if statements etc. disable threads that do not meet the condition).
Block sizes should be chosen to result in many full warps (e.g. 128 is better than 32 is better than 1 thread) - having lots of warps to switch between allows memory access latency to be hidden.
Warps can be executing, waiting for data, or waiting to execute. The scheduler will interleave warp execution to minimize the amount of time spent waiting for data.
Occupancy - the number of warps running concurrently, is limited by hardware constraints as well as register/shared memory usage.
Switching between warps has zero overhead as each warp has its own set of registers - the occupancy will be reduced if there is not enough register space.
Locks
Data race - data accessed and manipulated by multiple processors.
Can use atomic operations to perform load-add-store operations in a single step.
Only works for specific processors and on individual instructions. Use locks to execute arbitrary regions of code. Mutual exclusion - only one process can have access to the data.
volatile char lock = 0
while (true) {
if (lock == 0) {
lock = 1;
// read-modify-write
break;
}
}
do_stuff();
lock = 0;
An atomic compare-and-exchange operation must be used to acquire the lock CMPXCHG in x86.
Numba Methods
from numba import jit
@jit
def fn():
return
from numba import cuda
def fn(in_out_arr):
pos = cuda.grid(1) # coordinate of thread when split as a 1D array
# Atomic operations
old_val = cuda.atomic.add(arr, index, increment_by)
# if arr[0] == old, set it to new and return the old value
old_val = cuda.atomic.compare_and_swap(arr, old, new)
# Synchronization point
cuda.syncthreads()
arr = cuda.to_device(np.zeros(shape = 1, dtype=np.int32))
# Or make array directly on the GPU
global_arr = cuda.device_array(shape)
shared_arr = cuda.shared.array(shape, dtype)
# Max 1024 threads per blocks on most Nvidia GPUs
fn[blocks_per_grid, threads_per_block](arr)
main_mem_arr = global_arr.copy_to_host()
Raft Consensus Algorithm
Election:
- After election timeout (random, ~10x network latency), become candidate; vote for self and send request vote
- Vote for candidate if:
- Term is greater than current
- Haven’t yet voted for anyone in the new term
- Their log is as or more up-to-date than you
- Reset election timeout
- If majority of votes received, become leader
- If election timeout reached, increment term and try again
Log replication:
- Leader takes commands from client, appends to in-progress logs
- Leader sends heartbeats, possibly with entries to append
- Once command acknowledged by majority of followers, commit
- On next heartbeat, tell followers to commit previous command
If a leader gets a command from another leader with a greater term, step down and roll back uncommitted commands.