Polyglot data sets in Clojure
23 March 2014
Essays in building a Clojure LINQ library and getting to grips with cross-data source data analysis. The code is available on GitHub.
Background
As the summary line suggests there are a couple of separate pain points in working with bare Clojure for corporate data analysis:
- Manipulation of different data sources is non-uniform leaking the particulars of the data source into the Clojure code
- External data sources usually have stateful, closeable connections, which require distracting context management
- Cross-system joins are complex to write, often need to be carefully tuned for efficiency and safety so as to not exceed memory capacities
Higher level data manipulation primitives / LINQ
Core clojure provides powerful primitives for sequence processing, the all-star team of map, filter, remove, group-by, reduce and many less well-known cousins. However, the general powerfulness and reliance on functions means the same processing does not translate directly to non-Clojure data sources. Additionally, some of the opaqueness of map and reduce operations in that they can literally do anything including moving from sequences of records to sequences of primitives or even a single primitive makes it harder to reason about the operations and chain them together.
A slightly less powerful set of primitives are the building blocks of every SQL language: select, where, join etc. An important benefit of this set of operations is that they allow reasoning about starting and end result set schemas.
Finally, a data set layer built on top of clojure.core.reducers can exploit the reduction step to contain all connection management so as to declutter user code from the connection management. There is no entirely free lunch here though: an SQL data set wrapped in an implicit scope managing CollReduce will re-evaluate the query for every reduction. So users have to be aware of potential re-evaluation triggers and install appropriate caching or manage the number of reduction runs performed.
Cross data source joins
Disparate data silos are a fact of live in many if not all major corporations. Valuable chunks of information needed for overall analysis are maintained by separate departments in their own, organically grown infrastructure. Data warehousing at a corporate level provides an alternative built on the premise that all data will be centralized eventually to enable flexible analytics to be run.
Although this approach is preferable, implementation relies on buyin from a large group of stakeholders and reliance on shared, scalable infrastructure. One alternative consists in maintaining locally segregated warehouses featuring copies of data formally owned by other groups. Finally, individual data use cases can be constructed over disparate data sources via mashup techniques. However without a library to take care of some of the pains of joins and a consistent API to interact with data sets this approach very quickly leads to an unmanageable cludge of code.
API overview
Although the end result of the library should be a LINQ i.e. macro-based API, for more functional composeable it serves to consider the macro API no more than syntactic sugar and so start the design on the functional API instead.
Functional API
Say the three most critical definitions are defined as below, so far so logical (although one can argue both ways on whether varargs are preferable to a single array argument or even repeated calls). The tricky part is what representation to use for fields and conditions.
(defn select* [source & fields] ...)
(defn where* [source & conditions] ...)
(defn join* [left right options & fields] ...)
With the goal of supporting different data sources natively, fields and conditions need to be left in an abstract representation, in this case a representation as s-expressions seems appropriate. Which leads to a further decision of how to distinguish field references from literals and function / constant references. The current option is to treat keywords starting with '$' as field references. See for example the following examples:
;; select strategy field, mnemonic field as book and calculated description field
;; depending on the data source, the subs-based logic may not be supported natively
(select* accounts :$strategy
[:$mnemonic :as :book]
['(subs :$description 0 10) :as :description])
(where* accounts '(= :$strategy "001"))
Evidently, this representation is quite cumbersome to use and also still has a few gotchas. For example, in order to support locally bound query parameters it is not good enough to quote a whole condition. Instead everything except function names should be resolved by the compiler as per usual. Even more for any Clojure calculations even the functions need to be resolved in case of local overrides from global bindings. To handle some of the aspects (and hide potential future changes in quoting representation, a helper macro, quote-with-code, is in order. More than merely quoting the expression the macro also pre-generates the fallback Clojure code for the s-expression, generating a function taking an single record as input, for everything but primitive data elements.
(defn- function-quote [sexp]
(postwalk
(fn [exp]
(if (list? exp)
`(list (quote ~(first exp)) ~@(rest exp))
exp))
sexp))
(defmacro quote-with-code [sexp]
(if (instance? clojure.lang.IObj sexp)
`(with-meta ~(function-quote sexp)
{:function ~(let [s (gensym)]
(list
'fn
[s]
(postwalk
(fn [exp]
(if (field-ref? exp)
(list (field->keyword exp) s)
exp))
sexp)))})
sexp))
Macro API
With these building blocks in place we can define the three operations as macros (the asterisk-less version). Note that join does not support complex join conditions hence not code quoting is necessary.
(defmacro select [source & fields]
`(select* ~source
~@(map
(fn [f] (if (vector? f)
(vec (cons (list 'quote-with-code (first f)) (rest f)))
(list 'quote-with-code f)))
fields)))
(defmacro where [source & conditions]
`(where* ~source ~@(map #(list 'quote-with-code %) conditions)))
(defmacro join [left right options & fields]
`(join* ~left ~right ~options ~@fields))
Data source protocols
Having defined the user-facing API, let's consider the plugin API for hooking in separate data sources. This part needs to be at least equally well-defined to ensure that data sources can be easily added to the system and the vastly varying characteristics can be supported seamlessly. A set of protocols appears to be the most appropriate way to start.
(defprotocol Queryable
(-parse-sexp [self sexp]))
(defprotocol Selectable
(-select [self fields]))
(defprotocol Filterable
(-where [self conditions]))
(defprotocol Joinable
(-join [self other hints join-fields]))
Note that
- Data sources can opt to support or not support LINQ operations, missing operations will delegate to an appropriate Clojure wrapper. Furthermore, when conditions, select calculations cannot be parsed through Queryable#-parse-sexp these again will be handled by a Clojure wrapper.
- The plugin API uses '-' method prefixes, similar to '*' suffixes for functions underlying user facing macros
- There are dependencies not captured by the protocols, such as a type that implements Selectable or Filterable should also support Queryable.
One expected benefit from separating out all data source operations as separate protocols is that the richness of the LINQ layer can be extended seamlessly by defining the user API and providing the default (Clojure) data source implementation. All other data sources would get the behaviour via conversion to Clojure data sources. However, where possible a more efficient native implementation can be added to individual data sources. In this way the LINQ interface is also not bound by the minimum support (likely to be poor).
Efficiency features
In order to support somewhat efficient operation with wrapped data sets a couple of performance guarantees are unavoidable. The common theme is to run as many operations as possible in the native system and minimize the amount of data which needs to be transferred and handled on the client.
- Operations on native data sets particularly joins need to be handled natively where possible
- Cross-data source joins need to support a flow of using results from one source as additional filters for the joined on data set.
- (Not implemented) Where clauses should be pushed to the native layer if interspersed select layers don't affect the fields filtered on.
Compatibility with core Clojure
All data sources are at the minimum required to implement CollReduce. Hence any data set can, with some loss of functionality (and potential optimizations) be used in reducer operations. This enables to use the full power of map, filter, reduce operations with data sets where necessary by missing operations in the LINQ interface. Furthermore, any CollReduce can be converted back to a Clojure data set, so convertible is bi-directional.
Design and implementation noteworthies
Rather than covering the whole implementation, the sections below break out a selected list of potentially interesting topics.
1. Using clojure.core.reducers backport
The use of the reducers framework for data sets is far less concerned about the parallelization effects than the efficient management of connections and the automatic deferral of computation. Even more to be able to run on pre 1.5 Clojure as far back as 1.2.1 a backported version is needed that provides alternatives to clojure.core/into etc hooking into the reduce API. So the data set library is built on top of a cut of clojure.core.reducers customized for backward compatibility down to Clojure 1.2.1.
2. Support of 'select *' type queries
Universal selectors have a couple of important positives and negatives, both in terms of user experience and implementation challenges. On balance for the first draft it seemed safer to adjourn the decision on their support.
- Pro: Reduces cluttered select statements where calculated fields are to be added to a query.
- Pro: Allows canned queries that pass through new fields in underlying data sources without need for code updates on the queries.
- Con: Existing queries can be broken by introducing a new field in an underlying data source that clashes with an existing field from another data source.
- Con: Implementation wise the presence of star-selectors needs to be handled by all data sources and complicates reasoning about result set schemas after select or join operations.
3. Join strategies for cross data source joins
Possibly the most tricky topic are cross data source joins, especially those involving multiple external data sources (as opposed to one in-memory data structure and an external data source). In many cases the query author has to explicitly provide hints as to which order data sources should be joined and what if any results can be held in memory and/or cached.
To that extend the initial support consists of a :join-flow option in the join options parameter. Specifying for example :left (going alongside a :left or :inner join), would trigger the following behaviour.
- Evaluate the left data set.
- Extract the possible values of all join fields for the left data set.
- Add a where* clause on the right data set, constraining each join field to fall in the range determined in the previous step.
- Evaluate the left and right data set to perform the actual join.
Using join flows should be the default assumption for any cross data set joins involving a large external data set. Rather than transferring the complete, unconstrained data set. As much data as possible should be filtered on the external side via the appropriate join flow.
comments powered by Disqus