Exam Notes

Five Vs

Three Perspectives

Look at the problem from three levels of abstractions:

Cluster Architecture

Distributed File Systems (DFS)

Scale + durability + consistency.

Only useful in cases with large files and few file changes.

Master node stores metadata.

Chunk servers store contiguous chunks of 16-64 MB that have been replicated onto a few different machines.

Computation done directly on chunk servers instead of dealing with network transfer.

Cloud Computing

IaaS, PaaS, SaaS: infrastructure, platform, software.

Cloud advantages: scaling, resilience, high-level services, no personnel required.

Disadvantages: data transfer from local to cloud, no control when things go wrong, privacy.

Service layers:

Map-Reduce

Handles data partitioning, execution scheduling, machine failures, inter-machine communications.

If map task produces many elements with the same key and the reduce task is associative (re-grouping allowed) and commutative (left and right operands can be switched), reduction can be done on the map node, reducing the amount of data sent to the reduce node.

Spark

Uses lazy computing; reduces disk/network use but causes more progress to be lost if there is a failure.

Actions: convert RDD to local data structure.

Transformations: RDD to RDD and joins - multiple RDDs to an RDD.

When map task completes, master sent list of file locations/sizes, one for each reducer and sends this to the reduce nodes.

Failure

If worker fails (no ping response), master will just reschedule it on another node.

If this was a map worker then it must notify the reduce tasks that the location of their input files have changed.

If master node fails, MapReduce task aborted and client notified.

Skew

Some keys will have more values than others so some reduce tasks will finish sooner than others - skew. Hence, each reduce task should process multiple keys.

Creating more reduce tasks than compute nodes helps further if any finish sooner, some of the remaining tasks can be picked up.

Matrix-Vector Multiplication

Matrix MM, vector vv. iith element (row) of the resultant vector, MvMv:

(Mv)i=jmijvj (Mv)_i = \sum_j{m_{ij}v_j}

Matrix stored as triplets (i,j,mij)(i, j, m_ij) for non-zero entries, vectors as pairs (i,vi)(i, v_i).

Vector fits in memory:

Vector does not fit in memory:

Matrix-matrix multiplication: matrix-vector multiplication, but repeated for each column of the second matrix.

PySpark Methods

# INPUT
sc.textFile(file_path)
sc.parallelize(list)


# SPECIAL
# Use var.value to access array
sc.broadcast(arr)



# TRANSFORMATION
# el => (el, index)
rdd.zipWithIndex()

# [(key, val_1), (key, val_2)] => [(key, [val_1, val_2])]
rdd.groupByKey()

# reduceByKey must be **associative** (a*b*c = b*a*c) and **commutative** (a+b = b+a)
# fn(val_1, val_2) => val. Either input may be the output from fn and hence, output type must be the same as input type
# [(key, val)] => [(key, combined_vals)]
rdd.reduceByKey(fn)

# seq_op(aggregated_val, el) => new_aggregated_val
# comb_op(aggregated_val_1, aggregated_val_2) => new_aggregated_val
# Type of aggregated value can be different from element type
rdd.aggregateByKey(zero_val, seq_op, comb_op)

rdd.flatMap(fn)
rdd.map(fn)

# for each 'partition', iterator passed to some function. Yield some value
# e.g. sc.parallelize([1, 2, 3, 4], 2) splits data into two partitions
# iter => yield sum(iter) returns [3, 7]
# fn can yield multiple values; acts like flatMap
rdd.mapPartitions(fn)

# elements where `fn(el) is True` remain
rdd.filter(fn)



## SETS
# [a, b], [c, d] => [a, b, c, d]
rdd_1.union(rdd_2)
sc.union([rdd_1, rdd_2])

# Inner join
# [(key, val_1)], [(key, val_2)] => (key, (val_1, val_2))
# If key is duplicated, one tuple for every combination
# e.g. [(a, 1), (a, 2)].join([(a, 3)]) = [(a, (1, 3)), (a, (2, 3))]
rdd_1.join(rdd_2)

# Kinda-Union
# [(key, val_1), (key, val_2)], [(key, val_3)] => (key, ([val_1, val_2], [val_3])
rdd_1.cogroup(rdd_2)

# cogroup, but filters out elements if any of the values are False and returns only keys
rdd_1.intersection(rdd_2)

# ACTION
rdd.count()

# el => Number
rdd.max(fn)

# (val_1, val_2) => val
# Run locally on driver, does not need to be associative
rdd.reduce(fn)

# [el_1, el_1, el_2] => { el_1: 2, el_2: 1}
rdd.countByValue()

# [(key, val_1), (key, val_2)] => [val_1, val_2]
rdd.lookup(key)

rdd.cache()

# Return everything
rdd.collect()

# [(key, val)] => {key: val}
rdd.collectAsMap()

# Returns first n values - if not previously sorted, ordering is non-deterministic
rdd.take(n)
rdd.first()

# Returns first n values, sorted by fn - smallest first
rdd.takeOrdered(n, sort_fn)

Scalability

How much speed-up you get from parallelizing:

S=TsequentialTparallel \text{S} = \frac{T_{\text{sequential}}}{T_{\text{parallel}}}

Amdahl’s Law

Some code inherently sequential; as pp, number of processors tends to infinity, the time taken for the parallelizable section will tend to zero.

if ss is proportion of time spent on sequential code:

S=1s+1sp1s S = \frac{1}{s + \frac{1 - s}{p}} \le \frac{1}{s}

Gustafson’s Law

S=ps(p1) S = p - s \cdot (p - 1)

Strong scalability: time vs cores when problem size fixed

Weak scalability: time vs problem size/cores fixed. Sub-linear scalability if time increases.

Karp-Flatt Metric

Experimentally determining the serial fraction, ss, on PP processors.

If ψ\psi is the speed-up on PP processors:

e=1ψ1P11P e = \frac {\frac{1}{\psi} - \frac{1}{P}} { 1 - \frac{1}{P}}

The lower the value of ee, the greater the parallelization.

Communication

Frequent Item-sets

Market-Basket Model

Basket: small subset of items

Want to know how items are related.

Support for item-set II: number of baskets containing all items in II.

If the support is greater than the support threshold ss, it is called a frequent item-set.

Association Rules

If I={i1,,ik}I = \{ i_1, \dots, i_k \} and I>jI -> j, baskets containing all elements of II are likely to contain jj.

Confidence:

confidence(Ij)=support(Ij)support(I) \text{confidence}(I \rightarrow j) = \frac {\text{support}(I \cup j)} {\text{support}(I)}

High confidence != interesting: some items may just be frequent (e.g. ‘the’). Hence, use interest; confidence minus the probability of the item appearing in any basket.

interest(Ij)=confidence(Ij)P(j) \text{interest}(I \rightarrow j) = \left|\text{confidence}(I \rightarrow j) - P(j)\right|

High-interest rules (> 0.5> ~0.5) are ‘interesting’.

A-Priori

If set II appears as least ss times, every subset JJ must also appear at least ss times.

Similarity

Finding neighbors in a high-dimensional space.

Problem: given vector of data-points (possibly a flattened matrix) and a distance function d(x1,x2)d(x_1, x_2), find all points closer than some threshold ss.

Distance Measures

Axioms:

Cosine Distance

p1p2=p1p2cos(θ) p_1 \cdot p_2 = |p_1\|p_2| \cdot cos(\theta)

Hence cosine distance is:

d(p1,p2)=cos1(p1p2p1p2) d(p_1, p_2) = cos^{-1}\left(\frac {p_1 \cdot p_2} {|p_1\|p_2|} \right)

L_2 Norm/Euclidean Distance

Root of sum of squares of differences. If the vectors have nn dimensions:

d(x,y)=i=1n(xi2yi2) d(x, y) = \sqrt{\sum_{i=1}^{n}{(x_i^2 - y_i^2)}}

Jaccard Distance

For sets - dimensions are Boolean values.

Jaccard similarity:

sim(C1,C2)=C1C2C1C2 \text{sim}(C_1, C_2) = \frac{\left| C_1 \cap C_2 \right|} {\left| C_1 \cup C_2 \right|}

Jaccard distance:

d(C1,C2)=1sim(C1,C2) d(C_1, C_2) = 1 - \text{sim}(C_1, C_2)

Jaccard bag similarity:

Finding Similar Documents

O(n)O(n) approach to finding similar documents.

Shingling

A kk-shingle is any sequence of length kk of tokens (character, words) appearing in a document.

The shingles are sets, not bags, so there should be no repeated shingles within the set of shingles.

Pick kk such that the probability of any one shingle appearing in any particular document is low - if it is too small, it is likely to appear in most documents. Values of 5-10 are good starting points.

Hashing

Out of the domain of all possible shingles, only a small subset are likely to appear.

Hence, hashing can be used to map shingles to a bucket via a hash function with a smaller range (and hence require less memory). A bucket may contain multiple shingles.

With this, a document can be represented as a Boolean vector - one Boolean for every bucket (element in the hash range), true if the document contains a shingle in the bucket.

Hence, a set of documents can be represented as a matrix, rows being buckets and columns documents.

Since most documents are unlikely to contain most shingles, store this as an sparse matrix.

Min-Hashing

If a permutation of rows is picked, the min-hash of the document is the first element that is true (e.g. if permutation is 2,1,02, 1, 0 and document is <1,1,0><1, 1, 0>, output is 11).

If this is done with nn permutations, each document can be mapped to a nn-wide vector - the document signature.

This can be done to every document to get a signature matrix.

The probability of two sets having the same min-hash value is the Jaccard similarity.

Locality-Sensitive Hashing

The signature matrix can be partitioned into bb bands of rr rows each (splitting the document signature).

For each band, group documents into the same bucket if their signatures are the same for that band.

With min-hashes, the probability of two documents sharing a bucket grows linearly with their Jaccard similarity.

With bb bands of rr rows, this becomes:

P(bucket shared)=1(1tr)b P(\text{bucket shared}) = 1 - (1 - t^r)^b

Where tt is the Jaccard similarity.

The shape of the curve can be modified by picking the values of rr and bb for a given ss (threshold for document similarity), dependent on how many false positives and negatives you are willing to get.

Note that this outputs candidate pairs and so a proper Jaccard similarity calculation should be doe to confirm their similarity.

PageRank

More in-links = page is more important.

In-links from important page = link worth more.

Page’s importance split between its out-links:

Flow Model

Start with base-model which uses the number of in-links for page importance.

Page rank, rr given by:

rj=ijridi r_j = \sum_{i \rightarrow j}\frac{r_i}{d_i}

Where did_i is the out-degree of ii.

Under this current implementation there is no unique solution. Hence, a constraint is added: the sum of page ranks sums to 11. However, Gaussian elimination does not work well in large graphs.

Matrix Formulation

Matrix MM is a square matrix where for each row jj, column ii (MjiM_{ji}) is 1/di1/d_i if there is a in-link from ii to jj.

Mji={1di,ij0,otherwise M_{ji} = \begin{cases} \frac{1}{d_i}, & i \rightarrow j\\ 0 , & \text{otherwise} \end{cases}

MM is an column stochastic adjacency matrix; columns sum to 1.

The rank vector rr stores the page importance, rir_i being the importance of page ii.

The importance of all pages sum to 11:

iri=1 \sum_i{r_i} = 1

At time 00, the importance of each page can be set to 1/n1/n, nn being the number of pages.

At time tt, the importance of page rjr_j for time t+1t + 1 can be calculated as:

rj=ijridi r_j = \sum_{i \rightarrow j}{\frac{r_i}{d_i}}

Hence, for the entire rank vector:

r=Mr r = M \cdot r

If enough iterations are done and the Markov chain is strongly connected (every state accessible from any state), the stationary distribution will be unique and be reached regardless of the initial probability distribution.

Random Walk Interpretation

Model it as having a ‘surfer’ on some page ii at time tt, following a random out-link on the page at random at time t+1t + 1.

p(t)p(t) is a vector where p(t)ip(t)_i is the probability of the surfer being on page ii at time tt.

Teleports

Dead ends: some pages have no out-links. Random walker gets stuck and importance of page increases.

Spider-traps: all out-links within a group; all importance eventually absorbed by the trap.

Solution to both solutions (and to graph not being strongly connected): teleport to a random page with probability 1β1 - \beta where β0.80.9\beta \approx 0.8-0.9.

This leads us to the actual equations.

If NN is the number of of pages, rjr_j is given by: $$ r_j = \frac{1 - \beta}{N} + \beta \sum_{i \rightarrow j}{\frac{r_i}{d_i}} $$

The first term is from the page the walker is currently on teleporting (with probability 1β1 - \beta) to page jj (probability 1/N1/N). The multiplication by β\beta in the second term reflects that fact the the walker can only reach jj from an out-link β\beta of the time.

If [1N]N×N\left[\frac{1}{N}\right]_{N\times N} is a NN by NN matrix where all entries are 1/N1/N : $$ A = \beta M + (1 - \beta)\left[\frac{1}{N}\right]_{N\times N} $$

PageRank can then be applied iteratively using:

r=Ar r = A \cdot r

Performance

The matrix, if encoded as sparse matrix, will fit on disk but not memory.

Store a list of page IDs, the out-degree of the page, and the IDs of out-links from that page.

Assume the vector does fit in memory.

Lots of random writes.

Topic-Specific PageRank

Teleports only go to page relevant to a topic - this leads to a different vector rSr_S for each teleport set SS.

Spam

Issue 1: term spam - add relevant keywords to the page (visible only to the search engine), and/or copy text from top results.

Solution 1: statistical text analysis, duplicate page detection.

Solution 2: use text content of the in-links and surrounding text - trust what others say about you, know what you say.

Issue 1: the internet banding together for jokes.

Issue 2: spam farms - spammer link to target page from popular websites they have some access to (e.g. forums) and from sites they fully control (which the target page links to).

Solution 1: detecting and blacklisting spam farm-like structures.

Solution 2: TrustRank - topic-specific PageRank with a manually-selected set of trusted pages. Uses approximate isolation - trusted sites unlikely to link to untrustworthy sites.

Recommendation Systems

Utility matrix - rows are users, columns are things they have ‘rated’ (e.g. movies).

Need to:

Content-Based Recommendations

Recommend items similar to what the customer has previously liked.

Collaborative Filtering

Find a set of users NN whose ratings are ‘similar’ to the target user xx and extrapolate xx’s ratings based on those in NN.

Issues:

Community Detection

Girvan-Newman

Edge betweenness:

Girvan-Newman decomposes undirected, unweighted networks into a hierarchical network using the strength of weak ties - if the edge betweenness is large, it is probably at the border between communities and hence should be removed. The order in which the network is split determines the hierarchy.

Part 1

Pick starting node AA and run BFS; compute number of shortest paths (number of hops) to every other node.

Parent nodes are nodes one hop closer to AA it has an edge with; the sum of the parents’ values (number of shortest paths) is the node’s value.

Part 2

Betweenness can be calculated by working up the tree from the leaf nodes:

This process has a complexity of O(mn)O(mn) (mm = edges, nn = nodes)

This can be repeated with every starting node, summing betweenness values and then dividing by 22 (paths are counted from both ends).

A random subset of starting nodes can be used to get an approximation.

The edge with the greatest betweenness is removed, and the process is repeated until there are no edges left.

Network Communities

Girvan-Newman decomposes the network into clusters, but how many clusters should there be?

Community: sets of tightly connected nodes.

Modularity

Modularity, QQ, is a measure of how well the network is partitioned. Given a partitioning of the graph GG into groups sSs \in S:

QsSnumber of edges in sexpected number of edges in s Q \propto \sum_{s \in S}{ \text{number of edges in } s - \text{expected number of edges in } s}
Q(G,S)=12msSisjsAijkikj2m Q(G, S) = \frac{1}{2m} \sum_{s\in S}{\sum_{i \in s}{\sum_{j\in s}{A_{ij} - \frac{k_i k_j}{2m}}}}

Where mm is the number of edges, kik_i is the degree of node ii, and Aij=1A_{ij} = 1 if there is a edge iji \rightarrow j, and 00 otherwise.

QQ is normalized such that 1<Q<1-1 \lt Q \lt 1.

A positive value means the number of edges in ss is greater than expected: around 0.30.3 to 0.70.7 means significant community structure.

In nice cases, the modularity will have a maxima.

K-means

If there are no predefined or direct connections (e.g. helicopters can fly to anywhere) in some high-dimensional space, clustering can be done using some distance metric.

Instead of hierarchical clustering (like with Girvan-Newman which has a top-down/divisive approach) we can use point assignment; points belong to the nearest clusters.

Pick some number of clusters kk and pick a centroid for each cluster, preferably far away from each other.

For each element, place it in the cluster whose centroid it is nearest too, then update the centroid using the average point of all elements in the centroid.

Repeat until it converges.

Online Algorithms

Algorithm given to input one piece at a time and must make irrevocable decisions.

Bipartite Matching

Two sets of vertices (e.g. advertisements and queries), elements from one set can only be paired with a subset of the other, and elements can only be picked once.

Problem: match each element from one set to an eligible element from the other set.

Competitive ratio: worst performance over all possible inputs.

competitive ratio=minall inputsMalgorithmMoptimal \text{competitive ratio} = min_\text{all inputs}\frac {\left|M_\text{algorithm}\right|} {\left|M_\text{optimal}\right|}

AdWords

Performance-based advertising; charged only if ad clicked.

Use expected revenue per click: bid times the click-through rate.

Advertisers have a daily budget, and the CTR is calculated from historical data - exploration vs exploitation; show a new ad to estimate CTR or show ad with good known CTR?

Assume one ad/query, all advertisers have the same budget BB and ads have the same CTR and bid (11).

Greedy: 0.50.5

BALANCE: pick advertiser with the largest unspent budget, breaking ties via some deterministic way. 0.750.75 with two advertisers.

Parallel Computing

Clock speeds haven’t gone up significantly in years; better performance requires optimizations by the program.

Types of Parallelism

Data Dependence Graph: directed acyclic graph, vertices being tasks and edges dependencies.

Critical path: slowest path.

If two tasks are independent of each other, they can be done in parallel.

Data Parallelism: same operation executed on different elements simultaneously (or out-of-order).

Functional parallelism: independent tasks can apply different operations to different data elements simultaneously (or out-of-order).

Pipelining: dividing process into stages to produce items simultaneously.

Instruction-Level Parallelism

Dependencies: if two instructions data-dependent, they cannot be executed simultaneously.

Hazard and stall may or may not occur, depending on pipeline organization.

Data forwarding: store instruction in register if another instruction will use it soon.

Out-of-order execution complicated as some instructions take more clock cycles to complete (e.g. FP).

Out-of-order pipeline: pool up instructions, schedule them, execute them out-of-order, then commit in-order.

Approx. one order of magnitude improvement from these tricks.

Flynn’s Taxonomy:

Instructions and data:

Memory hierarchy: registers, L1/L2/L3 cache, RAM, secondary/permanent storage. Trade-off between size and speed will always exist as it will be limited by physical distance to the CPU.

Threads and Processes

Memory Model:

Multi-threaded process: shared code + data, kernel context, each thread has its own stack and context (registers etc.).

Concurrency can be simulated via time slicing. Multi-core processors can have true concurrency.

Two threads logically concurrent if their flows overlap in time - life of one thread (in terms of time) overlaps with another.

Threads and processes:

Memory consistency with threads: the only constraint is that instructions within your own thread appear to run in-order - no guarantee of sequential consistency with other threads.

Hyperthreading - replicates enough instruction-control hardware to process multiple instruction streams (but not the functional units - ALU etc.).

Vectorization: applying the same operation to multiple pieces of data

Roofline Model:

Python MPI

Multiple processes - distributed memory model.

Problem Decomposition

Large data sets split into small tasks; assume there needs to be communication between neighboring regions.

Blocks: each task given vertical slice of height nn. 2n2n items need to be communicated at each boundary.

Cyclic: each task given mm vertical slices of size nn, 2mn2mn items communicated per task. Benefit is that only one cycle worth of information has to be in memory at a time, allowing check-pointing. Also allows load-balancing as some areas may require less computation - the slicing is likely to split this expensive area across multiple tasks.

Partitioning can also be done in multiple dimensions (e.g. sliced vertically and horizontally).

Can also use domain decomposition: master task responsible for decomposition/assignment may decompose areas that are more expensive or require high accuracy further.

MPI Methods

from mpi4py import MPI
import numpy

comm = MPI.COMM_WORLD
size = comm.Get_size() # number of processes connected to the world
rank = comm.Get_rank() # process identifier. Starts at 0

data_to_send = numpy.zeros(1) # data must map to a byte buffer 
var_to_overwrite = numpy.zeros(4) # receive buffer can be bigger

comm.Send(data_to_send    , dest=process_to_send_to)
comm.Recv(var_to_overwrite, dest=process_to_receive_to)

# Non-blocking
req = comm.Isend(data_to_send, dest=RANK)
req = comm.Irecv(var_to_overwrite, source=RANK)
req.Wait()

# Receiving from any source
# use `status.source` to get source process rank
status = MPI.Status()
comm.Recv(var_to_overwrite, source=MPI.ANY_SOURCE, status=status)

# Use probe to modify variable to write to depending on source rank
comm.Probe(source=MPI.ANY_SOURCE, status=status)


# Collective operations
# Force processes to wait until all processes reach this point
comm.Barrier()

# For each element of the array, process 0 gets max value out of any process
comm.Reduce(in_arr, out_arr, op=MPI.MAX, root=0)

# Sends message from root to every other process
comm.Bcast(arr, root=0)

# Distribute chunks of the array from root process to all processes
comm.Scatter(in_arr, out_arr, root=0)

# Get chunks of arrays from every process and sends them to root
comm.Gather(in_arr, out_arr, root=0)

Deadlocks

If a process is waiting for a message it will never receive, deadlock results.

e.g. if all processes need to send and receive data, send call will block as it is waiting for the receiver to make a receive call, but all are trying to send.

Can also be caused by memory buffer filling up, causing the send function to wait.

Solution: paired send-and-receive:

if rank % 2:
  comm.Send(..., rank - 1)
  comm.Recv(..., rank - 1)

else:
  comm.Recv(..., rank + 1)
  comm.Send(..., rank + 1)

CUDA

SM: Simultaneous Multi-processor.

The entire program is a grid made up blocks: independent subtasks that can run in any order.

Global memory can be accessed by any block and each block has a single control logic unit.

Threads within a block can access shared memory and run the same instructions on a different bit of data.

Local memory’ is private to a thread, and is actually global memory. If too many registers are used it swaps to local memory.

syncthreads() acts as a synchronization barrier for threads in a block.

Warps

Each block divided into 32-thread warps. At any one time, one warp (not per-block!) is running, with all threads running the instructions in lock-step (if statements etc. disable threads that do not meet the condition).

Block sizes should be chosen to result in many full warps (e.g. 128 is better than 32 is better than 1 thread) - having lots of warps to switch between allows memory access latency to be hidden.

Warps can be executing, waiting for data, or waiting to execute. The scheduler will interleave warp execution to minimize the amount of time spent waiting for data.

Occupancy - the number of warps running concurrently, is limited by hardware constraints as well as register/shared memory usage.

Switching between warps has zero overhead as each warp has its own set of registers - the occupancy will be reduced if there is not enough register space.

Locks

Data race - data accessed and manipulated by multiple processors.

Can use atomic operations to perform load-add-store operations in a single step.

Only works for specific processors and on individual instructions. Use locks to execute arbitrary regions of code. Mutual exclusion - only one process can have access to the data.

volatile char lock = 0

while (true) {
  if (lock == 0) {
    lock = 1;
    // read-modify-write
    break;
  }
}

do_stuff();
lock = 0;

An atomic compare-and-exchange operation must be used to acquire the lock CMPXCHG in x86.

Numba Methods

from numba import jit

@jit
def fn():
  return

from numba import cuda
def fn(in_out_arr):
  pos = cuda.grid(1) # coordinate of thread when split as a 1D array

  # Atomic operations
  old_val = cuda.atomic.add(arr, index, increment_by)

  # if arr[0] == old, set it to new and return the old value
  old_val = cuda.atomic.compare_and_swap(arr, old, new)

  # Synchronization point
  cuda.syncthreads()

arr = cuda.to_device(np.zeros(shape = 1, dtype=np.int32))

# Or make array directly on the GPU
global_arr = cuda.device_array(shape)
shared_arr = cuda.shared.array(shape, dtype)

# Max 1024 threads per blocks on most Nvidia GPUs
fn[blocks_per_grid, threads_per_block](arr)


main_mem_arr = global_arr.copy_to_host()

Raft Consensus Algorithm

Election:

Log replication:

If a leader gets a command from another leader with a greater term, step down and roll back uncommitted commands.