Map-Reduce Join Examples
Two Hops
Given bi-directional adjacency list, write a map-reduce algorithm that outputs nodes that are reachable in two hops for each node.
with open("graph.txt", "w") as text_file:
text_file.write("""a b
a c
a e
b c
c d
d e
e x
x y
x z
w y
y z""")
def getTuples(line):
words = line.split(" ")
graph = sc.textFile("graph.txt")\
.map(lambda line: line.split(" "))\
.flatMap(lambda t: [(t[0], t[1]), (t[1], t[0])])
two_hop_tuples = graph.join(graph) # Join to itself
# Will return results such as ('b', ('a', 'a')): a->b->a (key is the node connecting the two)
two_hop_tuples = two_hop_tuples.filter(lambda t: t[1][0] != t[1][1]) # Remove loops
two_hop_tuples = two_hop_tuples.filter(lambda t: t[1][0] < t[1][1]) # Use string sort order to remove forwards/reverse direction duplicates
initialValue = []
reduction = (lambda aggregated, el: aggregated + [el]) # adds element to the list
reduce_aggregated = (lambda agg1, agg2: agg1 + agg2) # reducing two already aggregated values into one
reduced = two_hop_tuples.aggregateByKey(initialValue, reduction, reduce_aggregated)
dbg(reduced) # Array of tuples (middle, [start, end])
# aggregateByKey gives more power than reduceByKey (and so performs slightly worse)
Refinement: Combiners
If a map produces many pairs (k, v_1), …, (k, v_n) for the same key k which will be immediately reduced (e.g. word frequency) , network time can be saved by pre-aggregating values in the mapper: combine(k, list(v_1)) -> v2.
This only works if the reduce function is commutative and associative.
Skew
The number of values associated with each key will vary significantly, so if each reduce node is given a single key as a task (which is a bad idea as there is overhead associated with each node), there will be a significant amount variation in the time each task takes to complete (even with combiners, some keys will be more popular and so require network transfer from more map nodes). This is called skew.
To reduce this issue, each reduce task should be responsible for multiple keys - if this allocation is done randomly, the skew should be reduced from averaging.
To further decrease skew, create more reduce tasks than there are compute nodes - if the task is short, then that node can pick up a few more tasks in the time it takes another node to finish one.