With requirements for offline capabilities and fault tolerance, modern database management systems thrive for mechanisms that support simple replications. By using decentralized peer-to-peer systems and by replicating only files we can easily achieve a simple synchronization solution without tempering with databases itself. Since we are handling parallel events in the spacetime of distributed systems conflicts may arise. For simplicity we leave the conflict resolution for another time.

In the next section we will go through a simple example application for database replication with as few steps as possible. Background information about the underlying database Datahike and synchronization platform dat are described in the background section in the end. Basic knowledge of Clojure should be enough to get a sense of the following code. So let’s start.

Demo application

The following demo will show an example Clojure application with two peers and a simple Datahike database containing users with a name. First we will create an origin database with some entries, create a data repository in the database folder and then share it to another folder on the system by cloning and synchronizing it using the dat project, in particular the dat command-line client. Then we will attempt to read the cloned database again and observe live updates with a file watcher.

Figure FIG-1 illustrates the connection between Datahike, the peers, and the synchronization layer. Updates within the database in application origin app are written to the file system on the origin peer, the synchronization process dat share detects changes and propagates them to other peers, where process dat sync receives changes and applies them to the file system of the clone peer. Then clone app detects file changes and refreshes the connection to Datahike which reads again from the file system.

FIG-1: System overview
FIG-1: System overview

Setup

First make sure you have dat installed. See the official docs for further instructions. (If you are under Ubuntu, it might clash with a commandline tool from liballegro4-dev.) Since we are using Clojure, you need the JVM (check your operating system for Java options) and leiningen in order to execute the code. If you don’t want to type you may find the example code here. This example aims for Unix environments like Linux or MacOS but it should easily be translatable to Windows environments (just contact me if you need help with that).

Origin Peer

Let’s start our project from scratch. Open a terminal and begin with:

lein new datahike-replication
cd datahike-replication

Add the following to the :dependencies section in your project.clj with at least Clojure version 1.9.0.

[io.replikativ/datahike "0.2.0"]

Now we can fire up a REPL and start tinkering.

lein repl
=>
nREPL server started on port 49731 on host 127.0.0.1 - nrepl://127.0.0.1:49731
REPL-y 0.3.7, nREPL 0.2.13
Clojure 1.9.0
Java HotSpot(TM) 64-Bit Server VM 1.8.0_144-b01
    Docs: (doc function-name-here)
          (find-doc "part-of-name-here")
  Source: (source function-name-here)
 Javadoc: (javadoc java-object-or-class-here)
    Exit: Control+D or (exit) or (quit)
 Results: Stored in vars *1, *2, *3, an exception in *e

user=>

First we create an empty Datahike instance within our local folder /tmp/origin-dat and initialize it with a simple schema.

(require '[datahike.api :as d])

(def origin-dir "/tmp/origin-dat")
(def origin-uri (str "datahike:file://" origin-dir))
(def schema [{:db/ident :name
              :db/valueType :db.type/string
              :db/cardinality :db.cardinality/one}])

(d/create-database origin-uri :initial-tx schema)

(def origin-conn (d/connect origin-uri))

Now we add some data.

(d/transact origin-conn [{:name "Christian"}
                         {:name "Judith"}
                         {:name "Konrad"}])

Let’s check if everything is transacted correctly.

(d/q '[:find ?n
       :where [?e :name ?n]]
     @origin-conn)
;; => #{["Konrad"] ["Christian"] ["Judith"]}

Since we are using this query several times with different connections we should use it as a function.

(defn get-all-names [conn]
  (d/q '[:find ?n
         :where [?e :name ?n]]
       @conn))

Dat Replication

Now let’s initialize the dat origin peer in the Datahike folder and start sharing it.

cd /tmp/origin-dat
dat create
dat share

The share output provides the dat ID. Something like:

dat v13.13.1
dat://1d823e35715ed567f03b7db945241881e25ac97cc20819c9fa759c944ad8c412

Now we can share this link and start distributing the database. Let the sharing daemon open and create a new terminal. Next we clone the repository into /tmp/clone-dat. Make sure that the folder doesn’t exist yet.

cd /tmp
dat clone dat://1d823e35715ed567f03b7db945241881e25ac97cc20819c9fa759c944ad8c412 clone-dat

With the cloning done, we can start synchronizing continuously.

cd /tmp/clone-dat
dat sync

Now everything is set up and we can check out the replicated Datahike instance.

Peer Clone

Let’s go back into our open repl:

(def clone-dir "/tmp/clone-dat")
(def clone-uri (str "datahike:file://" clone-dir))
(def clone-conn (d/connect clone-uri))

(get-all-names clone-conn)
;; => #{["Konrad"] ["Christian"] ["Judith"]}

We have cloned the data once but updates through the synchronization are not realized in the Datahike connection. Let’s check this out by adding something to the origin database.

(d/transact origin-conn [{:name "Pablo"}])

(get-all-names origin-conn)
;; => #{["Konrad"] ["Pablo"] ["Christian"] ["Judith"]}

If you look at the terminal process where the clone synchronizing is happening you may see some updates coming through. Now let’s have a look at the clone-connection:

(get-all-names clone-conn)
;; => #{["Konrad"] ["Christian"] ["Judith"]}

Tough luck. Nothing happened. The Datahike in-memory index is not in sync. We need to re-create the local Datahike connection in order to get the updates in memory.

(def clone-conn (d/connect clone-uri))

(get-all-names clone-conn)
;; => #{["Konrad"] ["Pablo"] ["Christian"] ["Judith"]}

Alright, we have something here. Next, let’s update the connection whenever something in the clone peer has changed. As a simple solution we just check the length of dat’s content.signatures file every 2 seconds and refresh Datahike’s connection if it has changed. It may look a little bit frightening with all the functions from core.async but don’t be afraid. core.async provides a good level of abstraction to synchronize stateful processes.

(require '[clojure.core.async :refer [<! go-loop >! timeout chan put!]])

(defn signatures-length []
  (.length (clojure.java.io/file (str clone-dir "/.dat/content.signatures"))))

(defn reconnect []
  (let [state (atom {:chan (chan)
                     :conn (d/connect clone-uri)})]
    (go-loop [event :reconnect]
      (case event
        :stop (println :stopping)
        :reconnect (do
                     (swap! state assoc :conn (d/connect clone-uri))
                     (println :reconnected)
                     (recur (<! (:chan @state))))
        (recur (<! (:chan @state)))))
    (go-loop [l (signatures-length)]
      (<! (timeout 2000))
      (let [new-l (signatures-length)]
        (when-not (= l new-l)
          (>! (:chan @state) :reconnect))
        (recur new-l)))
    state))
    
(def state (reconnect))

The updates should now be replicated to the database clone whenever we transact new data in the origin connection. The REPL should display :reconnected once the connection is renewed.

(d/transact origin-conn [{:name "Chrislain"}])

(get-all-names origin-conn)
;; => #{["Konrad"] ["Chrislain"] ["Pablo"] ["Christian"] ["Judith"]}

Wait a second or two for dat to synchronize the data. Then we have to check the connection in the clone state, since it is reset every time dat synchronizes new data.

(get-all-names (:conn @state))
;; => #{["Konrad"] ["Chrislain"] ["Pablo"] ["Christian"] ["Judith"]}

Awesome, now we have a distributed database. You may stop watching the local content by sending the :stop event.

(put! (:chan @state) :stop)

Background

Although the base idea is very simple, it can only be adapted with databases that meet certain criteria. Firstly data updates must be operated atomically on a single file, so that the peer-to-peer system can propagate individual updates. Secondly most database systems use memory mapped files and mutate data in unpredictable positions all over the files of its transaction log and mutable indices. This way we don’t get any efficient data deltas and the peer-to-peer system has to calculate the changes at high costs that increase with database size.

You could imagine using a distributed file system like ZFS to make these mutable databases persistent but this often hurts their performance significantly and requiring a special file system is not a lean operation. We believe that ZFS would have less of a performance impact for Datahike though and its fast synchronization and composable notion of persistent snapshots is an unexplored, but attractive alternative to dat synchronization.

Either way using our database Datahike on any file system is sufficient here in combination with the dat project with its simple API and open protocol for file replication.

datahike and dat project

Datahike is a triplestore that uses hitchhiker trees as indexes and provides a powerful datalog query engine.

The Hitchhiker tree is a functional data structure invented by David Greenberg that incorporates a fractal tree which leads to very efficient read and write operations. It is a combination of a B+-Tree and an append-only log. For more information, have a look at the presentation about the hitchhiker tree from strangeloop 2016 or from Christian on the datopia blog.

The dat project is a nonprofit data sharing peer-to-peer protocol for building distributed applications. Currently dat is primarily implemented in JavaScript with APIs for high and low-level data synchronization.

Conclusion and future development

By combining Datahike’s efficient functional index with the replication mechanisms from dat we have a quick solution for simple data replication without typing any synchronization code ourselves.

Replicate All The Things

Multiple clones could scale our database reads by just replicating to more machines.

Only the original peer can write to the dat repository and therefore we have only a single writer setup. Multiple writers will require a conflict resolution scheme which is planned in Datahike for future work.

As replication systems you may try also IPFS, syncthing or even simple batch synchronization using rsync.

You should also be able to use a good old network file system like NFS, SSHFS, WebDAV or Windows network shares because our file system store does not update the files in place but ensures atomic copy-on-write semantics itself.

For the next iteration for Datahike we are extending the ideas of this simple replication mechanism.

We are very happy to discuss your ideas on slack or zulip and push things further since there are plenty of possibilities and we probably have not thought about all of them yet :).