# 1.2. Join implementations on top of MapReduce

While NoSQL databases generally support a more flexible data model than relational ones, many users of NoSQL databases still use a somewhat flat and homogeneous encoding of data (i.e. what is stored in NoSQL databases is still mostly relational tables). In this respect, the join operation is still of paramount importance. Indeed, although denormalization is possible, it increases the cost of writing to the database (since the join must be maintained) and furthermore, such writes may leave the system inconsistent for a while (since, in general, no notion of transaction exists in NoSQL databases). As a result, a large body of work has been done recently to compute joins effectively on top of MapReduce primitives.

Before exploring some of the most prevalent work in this area, we recall the definition of the join operator. Let *R* and *S* be two collections^{1}, the join operation between *R* and *S* is defined as:

where *θ* is a Boolean predicate over *r* and *s* called the join condition. When *θ* is an equality condition between (parts of) *r* and *s,* the join is called an equijoin. Joins can be generalized to an arbitrary number of collections (*n*-way joins) and several variations of the basic join operator exist.

A straightforward way to implement joins is the so-called nested loop: which iterates over all *r* elements in *R,* and for each *r,* performs an iteration over all *s* elements in *S,* and tests whether *θ*(*r, s*) holds (for instance, see [RAM 03], Chapter 14). While this technique is often used by relational databases to evaluate joins, it cannot be used in the context of a MapReduce evaluation, since it is impossible to iterate over the whole collection (which is distributed over several nodes). In the context of equijoins, however, a distributed solution can be devised easily and is given in Algorithm 1.1.

To perform the join, we assume that the MAP function is applied to each element of either collection, together with a tag indicating its origin (a simple string with the name of the collection, for instance). The MAP function outputs a pair of a *key* and the original element and its origin. The key must be the result of a hashing (or partition) function that is compatible with the *θ* condition of the join, that is:

During the shuffle phase of the MapReduce process, the elements are exchanged between nodes and all elements yielding the same hash value end up on the same node. The REDUCE function is then called on the key (the hash value) and the sequence of all elements that have this key. It then separates this input sequence with respect to the origin of the elements and can perform, on these two sequences, a nested loop to compute the join. This straightforward scheme is reminiscent of the Hash Join (e.g. see [RAM 03], Chapter 14) used in RDBMS. It suffers however from two drawbacks. The first is that it requires a hashing function that is compatible with the *θ* condition, which may prove difficult for conditions other than equality. Second, and more importantly, the cost of data exchange in the shuffle phase may be prohibitive. These two drawbacks have given rise to a lot of research in recent years. A first area of research is to reduce the data exchange by filtering bad join candidates early, during the map phase. The second area is to develop *ad-hoc* MapReduce implementations for particular joins (where the particular semantics of the *θ* condition is used).

In [PHA 16] and [PHA 14], Phan *et al.* reviewed and extended the state of the art on filter-based joins. Filter-based joins discard non-joinable tuples early by using Bloom filters (named after their inventor, Burton H. Bloom [BLO 70]). A Bloom filter is a *compact* data structure that soundly approximates a set interface. Given a set *S* of elements and a Bloom filter *F* constructed from *S,* the Bloom filter can tell whether an element e is *not* part of the set or if it is present with a high probability, that is, the Bloom filter *F* is sound (it will never answer that an element not in *S* belongs to *F*) but not complete (an element present in *F* may be absent from *S).* The advantage of Bloom filters is their great compactness and small query time (which is a fixed parameter *k* that only depends on the precision of the filter, and not on the number of elements stored in it). The work of Phan *et al.* extends existing approaches by introducing *intersection filter-based joins* in which Bloom filters are used to compute equijoins (as well as other related operators such as semi-joins). Their technique consists of two phases. Given two collections *R* and *S* that must be joined on a common attribute *x,* a first pre-processing phase projects each collection on attribute *x,* collects both results in two Bloom filters *F _{Rx}* and

*F*and computes the intersection filter

_{Sx}*F*∩

_{x}= F_{Rx}*F*which is very quick and easy. In practice, this filter is small enough to be distributed to all nodes. In a second phase, computing the distributed join, we may test during the map phase if the

_{Sx}*x*attribute of the given tuple is in

*F*and, if not, discard it from the join candidates early. Phan

_{x},*et al.*further extend their approach for multi-way joins and even recursive joins (which compute the transitive closure of the joined relations). Finally, they provide a complete cost analysis of their techniques, as well as others that can be used as a foundation for a MapReduce-based optimizer (they take particular care evaluating the cost of the initial pre-processing).

One reason why join algorithms may perform poorly is the presence of *data skew.* Such bias may be due to the original data, e.g. when the join attribute is not uniformly distributed, but may also be due to a bad distribution of the tuples among the existing nodes of the cluster. This observation was made early on in the context of MapReduce-based joins by Hassan in his PhD thesis [ALH 09]. Hassan introduced special algorithms for a particular form of queries, namely `GroupBy-Join`

queries, or SQL queries of the form:

SELECTR.x, R.y, S.z, f(S.u)

FROMR,S

WHERER.x = S.x

GROUP BYR.y, R.z

Hassan gives variants of the algorithm for the case where the joined on attribute *x* is also part of the *GROUP BY* clause. While Hassan’s work initially targeted distributed architectures, it was later adapted and specialized to the MapReduce paradigm (e.g. see [ALH 15]). While a detailed description of Hassan *et al.’s* algorithm (dubbed MRFAG-Join in their work) is outside of the scope of this survey, we give a high-level overview of the concepts involved. First, as for the reduce side join of Algorithm 1.1, the collections to be joined are distributed over all the nodes, and each tuple is tagged with its relation name. The algorithm then proceeds in three phases. The first phase uses one MapReduce operation to compute local histograms for the *x* values of *R* and *S* (recall that *x* is the joined on attribute). The histograms are local in the sense that they only take into account the tuples that are present on a given node. In the second phase, another MapReduce iteration is used to circulate the local histograms among the nodes and compute a global histogram of the frequencies of the pairs (*R.x, S.x*). While this step incurs some data movements, histograms are merely a summary of the original data and are therefore much smaller in size. Finally, based on the global distribution that is known to all the nodes at the end of the second step, the third step performs the join as usual. However, the information about the global distribution is used cleverly in two ways: first, it makes it possible to filter out join candidates that never occur (in this regard, the global histogram plays the same role as the Bloom filter of Phan *et al*.); but second, the distribution is also used to counteract any data skew that would be present and distribute the set of sub-relations to be joined evenly among the nodes. In practice, Hassan *et al.* showed that early filtering coupled with good load balancing properties allowed their MRFAG-join algorithm to outperform the default evaluation strategies of a state-of-the-art system by a factor of 10.

Improving joins over MapReduce is not limited to equijoins. Indeed, in many cases, domain-specific information can be used to prune non-joinable candidates early in the MapReduce process. For instance, in [PIL 16], Pilourdault *et al.* considered the problem of computing top-*k *temporal joins. In the context of their work, relations *R *and *S *are joined over an attribute *x *denoting a time interval. Furthermore, the join conditions involve high-level time functions such as *meet*(*R.x, S.x*) (the *R.x *interval finishes exactly when the *S.x *interval starts), *overlaps*(*R.x, S.x*) (the two interval intersects), and so on. Finally, the time functions are not interpreted as Boolean predicates, but rather as scoring functions, and the joins must return the top *k *scoring pairs of intervals for a given time function. The solution which they adopt consists of an initial offline, query-independent pre-processing step, followed by two MapReduce phases that answer the query. The offline pre-processing phase partitions the time into consecutive *granules* (time intervals), and collects statistics over the distribution of the time intervals to be joined among each granule. At query time, a first MapReduce process distributes the data using the statistics computed in the pre-processing phase. In a nutshell, granules are used as reducers’ input keys, which allows a reducer to process all intervals that occurred during the same granule together. Second, bounds for the scoring time function used in the query are computed and intervals that have no chance of being in the top-*k *results are discarded early. Finally, a final MapReduce step is used to compute the actual join among the reduced set of candidates.

In the same spirit, Fang *et al.* considered the problem of nearest-neighbor joins in [FAN 16]. The objects considered in this work are *trajectories*, that is, sequences of triple (*x, y, t*) where (*x, y*) are coordinates in the Euclidean plane and *t *a time stamp (indicating that a moving object was at position (*x, y*) at a time *t*). The authors focus on *k *nearest-neighbor joins, that is, given two sets of trajectories *R *and *S*, and find for each element of *R*, the set of *k *closest trajectories of *S*. The solution proposed by the authors is similar in spirit to the work of Pilourdault *et al.* on temporal joins. An initial pre-processing step first partitions the time in discrete consecutive intervals and then the space in rectangles. Trajectories are discretized and each is associated with the list of time interval and rectangles it intersects. At query time, the pair of an interval and rectangle is used as a partition key to assign pieces of trajectories to reducers. Four MapReduce stages are used, the first three collect statistics, perform early pruning of non-joinable objects and distribution over nodes, and the last step is – as in previously presented work – used to perform the join properly.

Apart from the previously presented works which focus on binary joins (and sometimes provide algorithms for ternary joins), Graux *et al.* studied the problem of *n*-ary equijoins [GRA 16] in the context of SPARQL [PRU 08] queries. SPARQL is the standard query language for the so-called semantic Web. More precisely, SPARQL is a W3C standardized query language that can query data expressed in the Resource Description Framework (RDF) [W3C 14]. Informally, the RDF data are structured into the so-called triples of a *subject*, a *predicate* and an *object*. These triples allow *facts* about the World to be described. For instance, a triple could be `("John", "lives in", "Paris")`

and another one could be `("Paris", "is in", "France")`

. Graux *et al.* focused on the query part of SPARQL (which also allows new triple sets to be reconstructed). SPARQL queries rely heavily on joins of triples. For instance, the query:

SELECT`?name ?town`

WHERE{

`?name "lives in" ?town .`

`?town "is in" "France"`

`}`

returns the pair of the name of a person and the city they live in, for all cities located in France (the “.” operator in the query acts as a conjunction). As we can see, the more triples with free variables in the query, the more joins there are to process. Graux *et al.* showed in their work how to efficiently store such triple sets on a distributed file system and how to translate a subset of SPARQL into Apache Spark code. While they use Spark’s built-in join operator (which implements roughly the reduce side join of Algorithm 1.1), Graux *et al*. made a clever use of statistics to find an optimal order for join evaluations. This results in an implementation that outperforms both state-of-the-art native SPARQL evaluators as well as other NoSQL-based SPARQL implementations on popular SPARQL benchmarks. Finally, they show how to extend their fragment of SPARQL with other operators such as union.