All Files in ‘DATA301 (2021-S1)’ Merged

01-02. Introduction

Course

James Atlas. Office hours: Monday 1-4 pm

Groups of 2-4. Three roles:

Weighting:

The Five Challenges

Scalability (volume, velocity): distributing computation/storage

Reliability (veracity): ensuring results survive when machines fail

Productivity (variety, value): making writing distributed programs easy

Three Perspectives

Look at the problem from three levels of abstractions:

Architecture

Scalability:

Storage/IO: ensuring data durability/consistency (RAID, DFS)

Distributed file system:

RDD: Resilient Distributed Dataset

Parallelism vs Scalability

Parallelism: P=W/DP = W/D, where WW is the work done (e.g. number of operations) and DD is the depth of computation (depth of dependency tree).

Scalability: given a fixed problem size, does adding more processors make it faster?

03. Scalability, Programming, and Algorithms in Spark

A single node cannot process large data sets (e.g. just saving the internet to disk would take several months under the most ideal conditions). Hence, large-scale computing uses multiple nodes; a few dozen nodes in a rack connected together by a switch, with racks connected in with each other by another switch (often with higher bandwidth).

The volumes of data are too large to store on a single node, so data is stored on a distributed file system where data is kept in contiguous chunks (16-64 MB) and replicated 2-3 times. One or more master nodes store metadata relating to the chunks.

The architecture assumes nodes will fail and can continue to function when this occurs.

Networking is slow and hence, the processing is done directly on the chunk servers.

Map-Reduce

The Map-Reduce architecture takes care of:

Map

Input is a set of key-value pairs, where the key is a nominal value (e.g. boolean, number).

Hence unlike dictionaries, the Map-Reduce architecture allows duplicate keys.

When map is called, every key-value pair is passed to a map lambda which outputs some number of key-value pairs (e.g. flatMap returns an array). This is called a transformation.

Reduce

(Likely already mapped) key-value pairs are given as input, and some reduction operation is performed:

Example: Word Counting

File (or a chunk of the file) given to each node. Then:

The reduce step is then applied again with the sets from the other nodes until we are left with a single set.

Spark

Spark is a very optimized implementation of MapReduce which attempts to reduce the amount of disk/network writes. The trade-off for this is that if a catastrophic failure occurs, more work is lost.

Spark lazily computes values: mapping, reducing etc. are methods of an RDD which return an RDD (allowing chaining), not the results themselves. Hence, intermediate results are not stored and the operations are done only when you actually call collect() or a similar method.

RDD: Resilient Distributed Dataset

RDDs are a fault-tolerant collection of elements that can be operated in parallel.

They can be made by parallelizing an existing collection (e.g. list in Python) or by referencing a dataset in a external storage system (e.g. shared file system).

Action

Actions generate a local data structure at the driver:

They are operations that run on but do not mutate the RDD: count, collect, reduce, max, foreach etc.

Transformation

Returns a reference to a new RDD:

Coordination

The master node (different from the driver) is responsible for coordination.

The node keeps track of task status; tasks can be idle (waiting for workers), in-progress or completed.

As workers become available, they get scheduled idle tasks.

When a map task is completed, the location and sizes of its intermediate files is sent to the master, which in turn sends this to workers doing reduction operations.

Failures

Master will periodically ping workers to detect failures.

If a map worker fails, in-progress/completed tasks reset to idle and must be reprocessed. Reduce workers are notified when the task is rescheduled to another map worker.

If a reduce worker fails, only in-progress tasks are reset to idle. The task is restarted on another worker.

If the master fails, the task is aborted and the client is notified.

Some Spark methods and miscellaneous notes

takeOrdered(n, sortFunction) may be used if it is not already sorted or if the RDD is already in a particular order. takeOrdered is much faster than running sort() and then take().

countByValue returns the count of each key: see the word count example. Returns a Python dictionary, not an RDD.

lookup(key) returns the value for a given key in an RDD.

Reduction functions must be associative/commutative; otherwise, the results will be wrong and non-deterministic.

Some transformations:

Example: Three-Word Sequences

def three_word(line):
  three_word_tuples = []

  words = line.split(" ")
  for i in range(len(words) - 2):
    words.append(" ".join([words[i], words[i + 1], words[i + 2]]))
  
  return three_word_tuples

words = file.flatMap(lambda line: (three_word(line), 1)])

counts = words.reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], False)

counts.collect()

Map-Reduce Join

Used in operations such as group by key and joins.

During mapping, a hash function is generated on the (mapped) key. This hash is used to determine which bucket (reduce node) the key-value pair is sent to (e.g. hash % k, where k is the number of buckets).

Hence, for a given key, all map nodes will send the key to the same reduce node, allowing the reduction step to be done.

04. Algorithms and Cloud Computing

Map-Reduce Join Examples

Two Hops

Given bi-directional adjacency list, write a map-reduce algorithm that outputs nodes that are reachable in two hops for each node.

with open("graph.txt", "w") as text_file: 
  text_file.write("""a b
a c
a e
b c
c d
d e
e x
x y
x z
w y
y z""")

def getTuples(line):
  words = line.split(" ")

graph = sc.textFile("graph.txt")\
  .map(lambda line: line.split(" "))\
  .flatMap(lambda t: [(t[0], t[1]), (t[1], t[0])])

two_hop_tuples = graph.join(graph) # Join to itself
# Will return results such as ('b', ('a', 'a')): a->b->a (key is the node connecting the two)
two_hop_tuples = two_hop_tuples.filter(lambda t: t[1][0] != t[1][1]) # Remove loops
two_hop_tuples = two_hop_tuples.filter(lambda t: t[1][0] < t[1][1]) # Use string sort order to remove forwards/reverse direction duplicates


initialValue = []
reduction = (lambda aggregated, el: aggregated + [el]) # adds element to the list
reduce_aggregated = (lambda agg1, agg2: agg1 + agg2) # reducing two already aggregated values into one
reduced = two_hop_tuples.aggregateByKey(initialValue, reduction, reduce_aggregated)
dbg(reduced) # Array of tuples (middle, [start, end])

# aggregateByKey gives more power than reduceByKey (and so performs slightly worse)

Refinement: Combiners

If a map produces many pairs (k, v_1), …, (k, v_n) for the same key k which will be immediately reduced (e.g. word frequency) , network time can be saved by pre-aggregating values in the mapper: combine(k, list(v_1)) -> v2.

This only works if the reduce function is commutative and associative.

Skew

The number of values associated with each key will vary significantly, so if each reduce node is given a single key as a task (which is a bad idea as there is overhead associated with each node), there will be a significant amount variation in the time each task takes to complete (even with combiners, some keys will be more popular and so require network transfer from more map nodes). This is called skew.

To reduce this issue, each reduce task should be responsible for multiple keys - if this allocation is done randomly, the skew should be reduced from averaging.

To further decrease skew, create more reduce tasks than there are compute nodes - if the task is short, then that node can pick up a few more tasks in the time it takes another node to finish one.

05. Cloud Computing, Performance - Cost and Complexity

Matrix Multiplication

Product of matrix MM with vector vv:

undefined(Mv)
Matrix stored as triplets $(i, j, m_ij)$ for non-zero entries, vectors as pairs $(i, v_i)$. Vector fits in memory: - Each map task given row of matrix and full vector - Calculate product for each column, returning $(i, m_{ij}v_j)$ - Reduce task sums products, returning $(i, x_i)$ Vector does not fit in memory: - Divide matrix into vertical stripes, vector into horizontal - Reduce tasks act as before Matrix-matrix multiplication: matrix-vector multiplication, but repeated for each column of the second matrix. #### PySpark Methods ```python # INPUT sc.textFile(file_path) sc.parallelize(list) # SPECIAL # Use var.value to access array sc.broadcast(arr) # TRANSFORMATION # el => (el, index) rdd.zipWithIndex() # [(key, val_1), (key, val_2)] => [(key, [val_1, val_2])] rdd.groupByKey() # reduceByKey must be **associative** (a*b*c = b*a*c) and **commutative** (a+b = b+a) # fn(val_1, val_2) => val. Either input may be the output from fn and hence, output type must be the same as input type # [(key, val)] => [(key, combined_vals)] rdd.reduceByKey(fn) # seq_op(aggregated_val, el) => new_aggregated_val # comb_op(aggregated_val_1, aggregated_val_2) => new_aggregated_val # Type of aggregated value can be different from element type rdd.aggregateByKey(zero_val, seq_op, comb_op) rdd.flatMap(fn) rdd.map(fn) # for each 'partition', iterator passed to some function. Yield some value # e.g. sc.parallelize([1, 2, 3, 4], 2) splits data into two partitions # iter => yield sum(iter) returns [3, 7] # fn can yield multiple values; acts like flatMap rdd.mapPartitions(fn) # elements where `fn(el) is True` remain rdd.filter(fn) ## SETS # [a, b], [c, d] => [a, b, c, d] rdd_1.union(rdd_2) sc.union([rdd_1, rdd_2]) # Inner join # [(key, val_1)], [(key, val_2)] => (key, (val_1, val_2)) # If key is duplicated, one tuple for every combination # e.g. [(a, 1), (a, 2)].join([(a, 3)]) = [(a, (1, 3)), (a, (2, 3))] rdd_1.join(rdd_2) # Kinda-Union # [(key, val_1), (key, val_2)], [(key, val_3)] => (key, ([val_1, val_2], [val_3]) rdd_1.cogroup(rdd_2) # cogroup, but filters out elements if any of the values are False and returns only keys rdd_1.intersection(rdd_2) # ACTION rdd.count() # el => Number rdd.max(fn) # (val_1, val_2) => val # Run locally on driver, does not need to be associative rdd.reduce(fn) # [el_1, el_1, el_2] => { el_1: 2, el_2: 1} rdd.countByValue() # [(key, val_1), (key, val_2)] => [val_1, val_2] rdd.lookup(key) rdd.cache() # Return everything rdd.collect() # [(key, val)] => {key: val} rdd.collectAsMap() # Returns first n values - if not previously sorted, ordering is non-deterministic rdd.take(n) rdd.first() # Returns first n values, sorted by fn - smallest first rdd.takeOrdered(n, sort_fn) ``` ## Scalability How much speed-up you get from parallelizing:

\text{S} = \frac{T_{\text{sequential}}}{T_{\text{parallel}}} $$

Amdahl’s Law

Some code inherently sequential; as pp, number of processors tends to infinity, the time taken for the parallelizable section will tend to zero.

if ss is proportion of time spent on sequential code:

S=1s+1sp1s S = \frac{1}{s + \frac{1 - s}{p}} \le \frac{1}{s}

Gustafson’s Law

S=ps(p1) S = p - s \cdot (p - 1)

Strong scalability: time vs cores when problem size fixed

Weak scalability: time vs problem size/cores fixed. Sub-linear scalability if time increases.

Karp-Flatt Metric

Experimentally determining the serial fraction, ss, on PP processors.

If ψ\psi is the speed-up on PP processors:

e=1ψ1P11P e = \frac {\frac{1}{\psi} - \frac{1}{P}} { 1 - \frac{1}{P}}

The lower the value of ee, the greater the parallelization.

Communication

Frequent Item-sets

Market-Basket Model

Basket: small subset of items

Want to know how items are related.

Support for item-set II: number of baskets containing all items in II.

If the support is greater than the support threshold ss, it is called a frequent item-set.

Association Rules

If I={i1,,ik}I = \{ i_1, \dots, i_k \} and I>jI -> j, baskets containing all elements of II are likely to contain jj.

Confidence:

confidence(Ij)=support(Ij)support(I) \text{confidence}(I \rightarrow j) = \frac {\text{support}(I \cup j)} {\text{support}(I)}

High confidence != interesting: some items may just be frequent (e.g. ‘the’). Hence, use interest; confidence minus the probability of the item appearing in any basket.

interest(Ij)=confidence(Ij)P(j) \text{interest}(I \rightarrow j) = \left|\text{confidence}(I \rightarrow j) - P(j)\right|

High-interest rules (> 0.5> ~0.5) are ‘interesting’.

A-Priori

If set II appears as least ss times, every subset JJ must also appear at least ss times.

Similarity

Finding neighbors in a high-dimensional space.

Problem: given vector of data-points (possibly a flattened matrix) and a distance function d(x1,x2)d(x_1, x_2), find all points closer than some threshold ss.

Distance Measures

Axioms:

Cosine Distance

p1p2=p1p2cos(θ) p_1 \cdot p_2 = |p_1\|p_2| \cdot cos(\theta)

Hence cosine distance is:

d(p1,p2)=cos1(p1p2p1p2) d(p_1, p_2) = cos^{-1}\left(\frac {p_1 \cdot p_2} {|p_1\|p_2|} \right)

L_2 Norm/Euclidean Distance

Root of sum of squares of differences. If the vectors have nn dimensions:

d(x,y)=i=1n(xi2yi2) d(x, y) = \sqrt{\sum_{i=1}^{n}{(x_i^2 - y_i^2)}}

Jaccard Distance

For sets - dimensions are Boolean values.

Jaccard similarity:

sim(C1,C2)=C1C2C1C2 \text{sim}(C_1, C_2) = \frac{\left| C_1 \cap C_2 \right|} {\left| C_1 \cup C_2 \right|}

Jaccard distance:

d(C1,C2)=1sim(C1,C2) d(C_1, C_2) = 1 - \text{sim}(C_1, C_2)

Jaccard bag similarity:

Finding Similar Documents

O(n)O(n) approach to finding similar documents.

Shingling

A kk-shingle is any sequence of length kk of tokens (character, words) appearing in a document.

The shingles are sets, not bags, so there should be no repeated shingles within the set of shingles.

Pick kk such that the probability of any one shingle appearing in any particular document is low - if it is too small, it is likely to appear in most documents. Values of 5-10 are good starting points.

Hashing

Out of the domain of all possible shingles, only a small subset are likely to appear.

Hence, hashing can be used to map shingles to a bucket via a hash function with a smaller range (and hence require less memory). A bucket may contain multiple shingles.

With this, a document can be represented as a Boolean vector - one Boolean for every bucket (element in the hash range), true if the document contains a shingle in the bucket.

Hence, a set of documents can be represented as a matrix, rows being buckets and columns documents.

Since most documents are unlikely to contain most shingles, store this as an sparse matrix.

Min-Hashing

If a permutation of rows is picked, the min-hash of the document is the first element that is true (e.g. if permutation is 2,1,02, 1, 0 and document is <1,1,0><1, 1, 0>, output is 11).

If this is done with nn permutations, each document can be mapped to a nn-wide vector - the document signature.

This can be done to every document to get a signature matrix.

The probability of two sets having the same min-hash value is the Jaccard similarity.

Locality-Sensitive Hashing

The signature matrix can be partitioned into bb bands of rr rows each (splitting the document signature).

For each band, group documents into the same bucket if their signatures are the same for that band.

With min-hashes, the probability of two documents sharing a bucket grows linearly with their Jaccard similarity.

With bb bands of rr rows, this becomes:

P(bucket shared)=1(1tr)b P(\text{bucket shared}) = 1 - (1 - t^r)^b

Where tt is the Jaccard similarity.

The shape of the curve can be modified by picking the values of rr and bb for a given ss (threshold for document similarity), dependent on how many false positives and negatives you are willing to get.

Note that this outputs candidate pairs and so a proper Jaccard similarity calculation should be doe to confirm their similarity.

PageRank

More in-links = page is more important.

In-links from important page = link worth more.

Page’s importance split between its out-links:

Flow Model

Start with base-model which uses the number of in-links for page importance.

Page rank, rr given by:

rj=ijridi r_j = \sum_{i \rightarrow j}\frac{r_i}{d_i}

Where did_i is the out-degree of ii.

Under this current implementation there is no unique solution. Hence, a constraint is added: the sum of page ranks sums to 11. However, Gaussian elimination does not work well in large graphs.

Matrix Formulation

Matrix MM is a square matrix where for each row jj, column ii (MjiM_{ji}) is 1/di1/d_i if there is a in-link from ii to jj.

Mji={1di,ij0,otherwise M_{ji} = \begin{cases} \frac{1}{d_i}, & i \rightarrow j\\ 0 , & \text{otherwise} \end{cases}

MM is an column stochastic adjacency matrix; columns sum to 1.

The rank vector rr stores the page importance, rir_i being the importance of page ii.

The importance of all pages sum to 11:

iri=1 \sum_i{r_i} = 1

At time 00, the importance of each page can be set to 1/n1/n, nn being the number of pages.

At time tt, the importance of page rjr_j for time t+1t + 1 can be calculated as:

rj=ijridi r_j = \sum_{i \rightarrow j}{\frac{r_i}{d_i}}

Hence, for the entire rank vector:

r=Mr r = M \cdot r

If enough iterations are done and the Markov chain is strongly connected (every state accessible from any state), the stationary distribution will be unique and be reached regardless of the initial probability distribution.

Random Walk Interpretation

Model it as having a ‘surfer’ on some page ii at time tt, following a random out-link on the page at random at time t+1t + 1.

p(t)p(t) is a vector where p(t)ip(t)_i is the probability of the surfer being on page ii at time tt.

Teleports

Dead ends: some pages have no out-links. Random walker gets stuck and importance of page increases.

Spider-traps: all out-links within a group; all importance eventually absorbed by the trap.

Solution to both solutions (and to graph not being strongly connected): teleport to a random page with probability 1β1 - \beta where β0.80.9\beta \approx 0.8-0.9.

This leads us to the actual equations.

If NN is the number of of pages, rjr_j is given by: $$ r_j = \frac{1 - \beta}{N} + \beta \sum_{i \rightarrow j}{\frac{r_i}{d_i}} $$

The first term is from the page the walker is currently on teleporting (with probability 1β1 - \beta) to page jj (probability 1/N1/N). The multiplication by β\beta in the second term reflects that fact the the walker can only reach jj from an out-link β\beta of the time.

If [1N]N×N\left[\frac{1}{N}\right]_{N\times N} is a NN by NN matrix where all entries are 1/N1/N : $$ A = \beta M + (1 - \beta)\left[\frac{1}{N}\right]_{N\times N} $$

PageRank can then be applied iteratively using:

r=Ar r = A \cdot r

Performance

The matrix, if encoded as sparse matrix, will fit on disk but not memory.

Store a list of page IDs, the out-degree of the page, and the IDs of out-links from that page.

Assume the vector does fit in memory.

Lots of random writes.

Topic-Specific PageRank

Teleports only go to page relevant to a topic - this leads to a different vector rSr_S for each teleport set SS.

Spam

Issue 1: term spam - add relevant keywords to the page (visible only to the search engine), and/or copy text from top results.

Solution 1: statistical text analysis, duplicate page detection.

Solution 2: use text content of the in-links and surrounding text - trust what others say about you, know what you say.

Issue 1: the internet banding together for jokes.

Issue 2: spam farms - spammer link to target page from popular websites they have some access to (e.g. forums) and from sites they fully control (which the target page links to).

Solution 1: detecting and blacklisting spam farm-like structures.

Solution 2: TrustRank - topic-specific PageRank with a manually-selected set of trusted pages. Uses approximate isolation - trusted sites unlikely to link to untrustworthy sites.

Recommendation Systems

Utility matrix - rows are users, columns are things they have ‘rated’ (e.g. movies).

Need to:

Content-Based Recommendations

Recommend items similar to what the customer has previously liked.

Collaborative Filtering

Find a set of users NN whose ratings are ‘similar’ to the target user xx and extrapolate xx’s ratings based on those in NN.

Issues:

Community Detection

Girvan-Newman

Edge betweenness:

Girvan-Newman decomposes undirected, unweighted networks into a hierarchical network using the strength of weak ties - if the edge betweenness is large, it is probably at the border between communities and hence should be removed. The order in which the network is split determines the hierarchy.

Part 1

Pick starting node AA and run BFS; compute number of shortest paths (number of hops) to every other node.

Parent nodes are nodes one hop closer to AA it has an edge with; the sum of the parents’ values (number of shortest paths) is the node’s value.

Part 2

Betweenness can be calculated by working up the tree from the leaf nodes:

This process has a complexity of O(mn)O(mn) (mm = edges, nn = nodes)

This can be repeated with every starting node, summing betweenness values and then dividing by 22 (paths are counted from both ends).

A random subset of starting nodes can be used to get an approximation.

The edge with the greatest betweenness is removed, and the process is repeated until there are no edges left.

Network Communities

Girvan-Newman decomposes the network into clusters, but how many clusters should there be?

Community: sets of tightly connected nodes.

Modularity

Modularity, QQ, is a measure of how well the network is partitioned. Given a partitioning of the graph GG into groups sSs \in S:

QsSnumber of edges in sexpected number of edges in s Q \propto \sum_{s \in S}{ \text{number of edges in } s - \text{expected number of edges in } s}
Q(G,S)=12msSisjsAijkikj2m Q(G, S) = \frac{1}{2m} \sum_{s\in S}{\sum_{i \in s}{\sum_{j\in s}{A_{ij} - \frac{k_i k_j}{2m}}}}

Where mm is the number of edges, kik_i is the degree of node ii, and Aij=1A_{ij} = 1 if there is a edge iji \rightarrow j, and 00 otherwise.

QQ is normalized such that 1<Q<1-1 \lt Q \lt 1.

A positive value means the number of edges in ss is greater than expected: around 0.30.3 to 0.70.7 means significant community structure.

In nice cases, the modularity will have a maxima.

K-means

If there are no predefined or direct connections (e.g. helicopters can fly to anywhere) in some high-dimensional space, clustering can be done using some distance metric.

Instead of hierarchical clustering (like with Girvan-Newman which has a top-down/divisive approach) we can use point assignment; points belong to the nearest clusters.

Pick some number of clusters kk and pick a centroid for each cluster, preferably far away from each other.

For each element, place it in the cluster whose centroid it is nearest too, then update the centroid using the average point of all elements in the centroid.

Repeat until it converges.

Online Algorithms

Algorithm given to input one piece at a time and must make irrevocable decisions.

Bipartite Matching

Two sets of vertices (e.g. advertisements and queries), elements from one set can only be paired with a subset of the other, and elements can only be picked once.

Problem: match each element from one set to an eligible element from the other set.

Competitive ratio: worst performance over all possible inputs.

competitive ratio=minall inputsMalgorithmMoptimal \text{competitive ratio} = min_\text{all inputs}\frac {\left|M_\text{algorithm}\right|} {\left|M_\text{optimal}\right|}

AdWords

Performance-based advertising; charged only if ad clicked.

Use expected revenue per click: bid times the click-through rate.

Advertisers have a daily budget, and the CTR is calculated from historical data - exploration vs exploitation; show a new ad to estimate CTR or show ad with good known CTR?

Assume one ad/query, all advertisers have the same budget BB and ads have the same CTR and bid (11).

Greedy: 0.50.5

BALANCE: pick advertiser with the largest unspent budget, breaking ties via some deterministic way. 0.750.75 with two advertisers.

Parallel Computing

Clock speeds haven’t gone up significantly in years; better performance requires optimizations by the program.

Types of Parallelism

Data Dependence Graph: directed acyclic graph, vertices being tasks and edges dependencies.

Critical path: slowest path.

If two tasks are independent of each other, they can be done in parallel.

Data Parallelism: same operation executed on different elements simultaneously (or out-of-order).

Functional parallelism: independent tasks can apply different operations to different data elements simultaneously (or out-of-order).

Pipelining: dividing process into stages to produce items simultaneously.

Instruction-Level Parallelism

Dependencies: if two instructions data-dependent, they cannot be executed simultaneously.

Hazard and stall may or may not occur, depending on pipeline organization.

Data forwarding: store instruction in register if another instruction will use it soon.

Out-of-order execution complicated as some instructions take more clock cycles to complete (e.g. FP).

Out-of-order pipeline: pool up instructions, schedule them, execute them out-of-order, then commit in-order.

Approx. one order of magnitude improvement from these tricks.

Flynn’s Taxonomy:

Instructions and data:

Memory hierarchy: registers, L1/L2/L3 cache, RAM, secondary/permanent storage. Trade-off between size and speed will always exist as it will be limited by physical distance to the CPU.

Threads and Processes

Memory Model:

Multi-threaded process: shared code + data, kernel context, each thread has its own stack and context (registers etc.).

Concurrency can be simulated via time slicing. Multi-core processors can have true concurrency.

Two threads logically concurrent if their flows overlap in time - life of one thread (in terms of time) overlaps with another.

Threads and processes:

Memory consistency with threads: the only constraint is that instructions within your own thread appear to run in-order - no guarantee of sequential consistency with other threads.

Hyperthreading - replicates enough instruction-control hardware to process multiple instruction streams (but not the functional units - ALU etc.).

Vectorization: applying the same operation to multiple pieces of data

Roofline Model:

Python MPI

Multiple processes - distributed memory model.

Problem Decomposition

Large data sets split into small tasks; assume there needs to be communication between neighboring regions.

Blocks: each task given vertical slice of height nn. 2n2n items need to be communicated at each boundary.

Cyclic: each task given mm vertical slices of size nn, 2mn2mn items communicated per task. Benefit is that only one cycle worth of information has to be in memory at a time, allowing check-pointing. Also allows load-balancing as some areas may require less computation - the slicing is likely to split this expensive area across multiple tasks.

Partitioning can also be done in multiple dimensions (e.g. sliced vertically and horizontally).

Can also use domain decomposition: master task responsible for decomposition/assignment may decompose areas that are more expensive or require high accuracy further.

MPI Methods

from mpi4py import MPI
import numpy

comm = MPI.COMM_WORLD
size = comm.Get_size() # number of processes connected to the world
rank = comm.Get_rank() # process identifier. Starts at 0

data_to_send = numpy.zeros(1) # data must map to a byte buffer 
var_to_overwrite = numpy.zeros(4) # receive buffer can be bigger

comm.Send(data_to_send    , dest=process_to_send_to)
comm.Recv(var_to_overwrite, dest=process_to_receive_to)

# Non-blocking
req = comm.Isend(data_to_send, dest=RANK)
req = comm.Irecv(var_to_overwrite, source=RANK)
req.Wait()

# Receiving from any source
# use `status.source` to get source process rank
status = MPI.Status()
comm.Recv(var_to_overwrite, source=MPI.ANY_SOURCE, status=status)

# Use probe to modify variable to write to depending on source rank
comm.Probe(source=MPI.ANY_SOURCE, status=status)


# Collective operations
# Force processes to wait until all processes reach this point
comm.Barrier()

# For each element of the array, process 0 gets max value out of any process
comm.Reduce(in_arr, out_arr, op=MPI.MAX, root=0)

# Sends message from root to every other process
comm.Bcast(arr, root=0)

# Distribute chunks of the array from root process to all processes
comm.Scatter(in_arr, out_arr, root=0)

# Get chunks of arrays from every process and sends them to root
comm.Gather(in_arr, out_arr, root=0)

Deadlocks

If a process is waiting for a message it will never receive, deadlock results.

e.g. if all processes need to send and receive data, send call will block as it is waiting for the receiver to make a receive call, but all are trying to send.

Can also be caused by memory buffer filling up, causing the send function to wait.

Solution: paired send-and-receive:

if rank % 2:
  comm.Send(..., rank - 1)
  comm.Recv(..., rank - 1)

else:
  comm.Recv(..., rank + 1)
  comm.Send(..., rank + 1)

CUDA

SM: Simultaneous Multi-processor.

The entire program is a grid made up blocks: independent subtasks that can run in any order.

Global memory can be accessed by any block and each block has a single control logic unit.

Threads within a block can access shared memory and run the same instructions on a different bit of data.

Local memory’ is private to a thread, and is actually global memory. If too many registers are used it swaps to local memory.

syncthreads() acts as a synchronization barrier for threads in a block.

Warps

Each block divided into 32-thread warps. At any one time, one warp (not per-block!) is running, with all threads running the instructions in lock-step (if statements etc. disable threads that do not meet the condition).

Block sizes should be chosen to result in many full warps (e.g. 128 is better than 32 is better than 1 thread) - having lots of warps to switch between allows memory access latency to be hidden.

Warps can be executing, waiting for data, or waiting to execute. The scheduler will interleave warp execution to minimize the amount of time spent waiting for data.

Occupancy - the number of warps running concurrently, is limited by hardware constraints as well as register/shared memory usage.

Switching between warps has zero overhead as each warp has its own set of registers - the occupancy will be reduced if there is not enough register space.

Locks

Data race - data accessed and manipulated by multiple processors.

Can use atomic operations to perform load-add-store operations in a single step.

Only works for specific processors and on individual instructions. Use locks to execute arbitrary regions of code. Mutual exclusion - only one process can have access to the data.

volatile char lock = 0

while (true) {
  if (lock == 0) {
    lock = 1;
    // read-modify-write
    break;
  }
}

do_stuff();
lock = 0;

An atomic compare-and-exchange operation must be used to acquire the lock CMPXCHG in x86.

Numba Methods

from numba import jit

@jit
def fn():
  return

from numba import cuda
def fn(in_out_arr):
  pos = cuda.grid(1) # coordinate of thread when split as a 1D array

  # Atomic operations
  old_val = cuda.atomic.add(arr, index, increment_by)

  # if arr[0] == old, set it to new and return the old value
  old_val = cuda.atomic.compare_and_swap(arr, old, new)

  # Synchronization point
  cuda.syncthreads()

arr = cuda.to_device(np.zeros(shape = 1, dtype=np.int32))

# Or make array directly on the GPU
global_arr = cuda.device_array(shape)
shared_arr = cuda.shared.array(shape, dtype)

# Max 1024 threads per blocks on most Nvidia GPUs
fn[blocks_per_grid, threads_per_block](arr)


main_mem_arr = global_arr.copy_to_host()

Raft Consensus Algorithm

Election:

Log replication:

If a leader gets a command from another leader with a greater term, step down and roll back uncommitted commands.