Polyglot data sets in Clojure

23 March 2014

Essays in building a Clojure LINQ library and getting to grips with cross-data source data analysis. The code is available on GitHub.

Background

As the summary line suggests there are a couple of separate pain points in working with bare Clojure for corporate data analysis:

As always there is an expectation that ease of use of the unified data API will adversely affect performance. However, the goal is to minimize the impact and in particular ensure that as much work is delegated to native processors rather than being handled by Clojure after loading data in memory.

Higher level data manipulation primitives / LINQ

Core clojure provides powerful primitives for sequence processing, the all-star team of map, filter, remove, group-by, reduce and many less well-known cousins. However, the general powerfulness and reliance on functions means the same processing does not translate directly to non-Clojure data sources. Additionally, some of the opaqueness of map and reduce operations in that they can literally do anything including moving from sequences of records to sequences of primitives or even a single primitive makes it harder to reason about the operations and chain them together.

A slightly less powerful set of primitives are the building blocks of every SQL language: select, where, join etc. An important benefit of this set of operations is that they allow reasoning about starting and end result set schemas.

Finally, a data set layer built on top of clojure.core.reducers can exploit the reduction step to contain all connection management so as to declutter user code from the connection management. There is no entirely free lunch here though: an SQL data set wrapped in an implicit scope managing CollReduce will re-evaluate the query for every reduction. So users have to be aware of potential re-evaluation triggers and install appropriate caching or manage the number of reduction runs performed.

Cross data source joins

Disparate data silos are a fact of live in many if not all major corporations. Valuable chunks of information needed for overall analysis are maintained by separate departments in their own, organically grown infrastructure. Data warehousing at a corporate level provides an alternative built on the premise that all data will be centralized eventually to enable flexible analytics to be run.

Although this approach is preferable, implementation relies on buyin from a large group of stakeholders and reliance on shared, scalable infrastructure. One alternative consists in maintaining locally segregated warehouses featuring copies of data formally owned by other groups. Finally, individual data use cases can be constructed over disparate data sources via mashup techniques. However without a library to take care of some of the pains of joins and a consistent API to interact with data sets this approach very quickly leads to an unmanageable cludge of code.

API overview

Although the end result of the library should be a LINQ i.e. macro-based API, for more functional composeable it serves to consider the macro API no more than syntactic sugar and so start the design on the functional API instead.

Functional API

Say the three most critical definitions are defined as below, so far so logical (although one can argue both ways on whether varargs are preferable to a single array argument or even repeated calls). The tricky part is what representation to use for fields and conditions.

(defn select* [source & fields] ...)
(defn where* [source & conditions] ...)
(defn join* [left right options & fields] ...)

With the goal of supporting different data sources natively, fields and conditions need to be left in an abstract representation, in this case a representation as s-expressions seems appropriate. Which leads to a further decision of how to distinguish field references from literals and function / constant references. The current option is to treat keywords starting with '$' as field references. See for example the following examples:

;; select strategy field, mnemonic field as book and calculated description field
;; depending on the data source, the subs-based logic may not be supported natively
(select* accounts :$strategy 
                  [:$mnemonic :as :book] 
                  ['(subs :$description 0 10) :as :description])

(where* accounts '(= :$strategy "001"))

Evidently, this representation is quite cumbersome to use and also still has a few gotchas. For example, in order to support locally bound query parameters it is not good enough to quote a whole condition. Instead everything except function names should be resolved by the compiler as per usual. Even more for any Clojure calculations even the functions need to be resolved in case of local overrides from global bindings. To handle some of the aspects (and hide potential future changes in quoting representation, a helper macro, quote-with-code, is in order. More than merely quoting the expression the macro also pre-generates the fallback Clojure code for the s-expression, generating a function taking an single record as input, for everything but primitive data elements.

(defn- function-quote [sexp]
  (postwalk
   (fn [exp]
     (if (list? exp)
       `(list (quote ~(first exp)) ~@(rest exp))
       exp))
   sexp))

(defmacro quote-with-code [sexp]
  (if (instance? clojure.lang.IObj sexp)
    `(with-meta ~(function-quote sexp)
       {:function ~(let [s (gensym)]
                     (list
                      'fn
                      [s]
                      (postwalk 
                       (fn [exp]
                         (if (field-ref? exp)
                           (list (field->keyword exp) s)
                           exp))
                       sexp)))})
    sexp))

Macro API

With these building blocks in place we can define the three operations as macros (the asterisk-less version). Note that join does not support complex join conditions hence not code quoting is necessary.

(defmacro select [source & fields]
  `(select* ~source 
            ~@(map 
               (fn [f] (if (vector? f)
                         (vec (cons (list 'quote-with-code (first f)) (rest f)))
                         (list 'quote-with-code f))) 
               fields)))

(defmacro where [source & conditions]
  `(where* ~source ~@(map #(list 'quote-with-code %) conditions)))

(defmacro join [left right options & fields]
  `(join* ~left ~right ~options ~@fields))

Data source protocols

Having defined the user-facing API, let's consider the plugin API for hooking in separate data sources. This part needs to be at least equally well-defined to ensure that data sources can be easily added to the system and the vastly varying characteristics can be supported seamlessly. A set of protocols appears to be the most appropriate way to start.

(defprotocol Queryable 
  (-parse-sexp [self sexp]))

(defprotocol Selectable
  (-select [self fields]))

(defprotocol Filterable
  (-where [self conditions]))

(defprotocol Joinable
  (-join [self other hints join-fields]))

Note that

One expected benefit from separating out all data source operations as separate protocols is that the richness of the LINQ layer can be extended seamlessly by defining the user API and providing the default (Clojure) data source implementation. All other data sources would get the behaviour via conversion to Clojure data sources. However, where possible a more efficient native implementation can be added to individual data sources. In this way the LINQ interface is also not bound by the minimum support (likely to be poor).

Efficiency features

In order to support somewhat efficient operation with wrapped data sets a couple of performance guarantees are unavoidable. The common theme is to run as many operations as possible in the native system and minimize the amount of data which needs to be transferred and handled on the client.

  1. Operations on native data sets particularly joins need to be handled natively where possible
  2. Cross-data source joins need to support a flow of using results from one source as additional filters for the joined on data set.
  3. (Not implemented) Where clauses should be pushed to the native layer if interspersed select layers don't affect the fields filtered on.

Compatibility with core Clojure

All data sources are at the minimum required to implement CollReduce. Hence any data set can, with some loss of functionality (and potential optimizations) be used in reducer operations. This enables to use the full power of map, filter, reduce operations with data sets where necessary by missing operations in the LINQ interface. Furthermore, any CollReduce can be converted back to a Clojure data set, so convertible is bi-directional.

Design and implementation noteworthies

Rather than covering the whole implementation, the sections below break out a selected list of potentially interesting topics.

1. Using clojure.core.reducers backport

The use of the reducers framework for data sets is far less concerned about the parallelization effects than the efficient management of connections and the automatic deferral of computation. Even more to be able to run on pre 1.5 Clojure as far back as 1.2.1 a backported version is needed that provides alternatives to clojure.core/into etc hooking into the reduce API. So the data set library is built on top of a cut of clojure.core.reducers customized for backward compatibility down to Clojure 1.2.1.

2. Support of 'select *' type queries

Universal selectors have a couple of important positives and negatives, both in terms of user experience and implementation challenges. On balance for the first draft it seemed safer to adjourn the decision on their support.

3. Join strategies for cross data source joins

Possibly the most tricky topic are cross data source joins, especially those involving multiple external data sources (as opposed to one in-memory data structure and an external data source). In many cases the query author has to explicitly provide hints as to which order data sources should be joined and what if any results can be held in memory and/or cached.

To that extend the initial support consists of a :join-flow option in the join options parameter. Specifying for example :left (going alongside a :left or :inner join), would trigger the following behaviour.

  1. Evaluate the left data set.
  2. Extract the possible values of all join fields for the left data set.
  3. Add a where* clause on the right data set, constraining each join field to fall in the range determined in the previous step.
  4. Evaluate the left and right data set to perform the actual join.
Note that the left data set, if not explicitly cached by the query author, will be evaluated twice. For the right data set the results are constrained to the most relevant for the eventual join. However, given that every join field will be evaluated individually against possible combinations derived from the left data set, there may still be a number of non-joinable entries where the combination of join field values is not compatible with the left data set even though each individual field value is.

Using join flows should be the default assumption for any cross data set joins involving a large external data set. Rather than transferring the complete, unconstrained data set. As much data as possible should be filtered on the external side via the appropriate join flow.

comments powered by Disqus