Updated

MycoKV

MycoKV logo which includes a tree drawn like circuitry

MycoKV is a fast, hierarchical, in-memory key-value store built in Rust. It provides write-ahead log persistence, support for nest key storage and retrieval, and a Node.js driver for use in web server applications.

MycoKV - Docker Image

mycokv-node - NPM Package

This has been a fun project to work on - I love working in Rust and I love developing solutions that make use of my knowledge of data structures and algorithms. This article will be a quick summary of the problems I invented and the solutions I came up with to solve them.

Key-Value Storage

First and foremost, MycoKV is a key-value store. At its root is a simple HashMap that maps keys to their respective values. It does support multiple types, including strings, ints, floats, booleans, and null - which I was able to achieve through the use of Rust's enums.

#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)]
#[serde(untagged)]
pub enum Value {
    String(String),
    Integer(i64),
    Float(f64),
    Boolean(bool),
    Null,
}

Rust enums are quite fantastic to use - and I find myself reaching for them quite often. Unlike enums in other languages, you can associate values and even structs with each enum variant. From there, you can use Rust's match keyword to handle each potential case. You can also implement methods on the enum itself - which is great for creating different ways to parse or construct them.

impl Value {
    pub fn parse(value: &str) -> Result<Self, TransactionError> {
        match value {
            "null" => Ok(Value::Null),
            "true" => Ok(Value::Boolean(true)),
            "false" => Ok(Value::Boolean(false)),
            _ => {
                if let Ok(number) = value.parse::<i64>() {
                    Ok(Value::Integer(number))
                } else if let Ok(number) = value.parse::<f64>() {
                    Ok(Value::Float(number))
                } else if value.starts_with('"') && value.ends_with('"') {
                    Ok(Value::String(value[1..value.len() - 1].to_string()))
                } else {
                    Err(TransactionError::InvalidValue(value.to_string()))
                }
            }
        }
    }

    pub fn to_string(&self) -> String {
        match self {
            Value::String(string) => format!("\"{}\"", string),
            Value::Integer(number) => number.to_string(),
            Value::Float(number) => number.to_string(),
            Value::Boolean(boolean) => boolean.to_string(),
            Value::Null => "null".to_string(),
        }
    }
}

Ultimately, this enables me to accept and store all these different types in the same HashMap.

HashMap<String, Value>

Hierarchical/Nested Keys

One unique aspect of MycoKV is its ability to store and query nested keys. In simple terms this means that you can do the following:

> PUT kitchen.drawers 8
8
> PUT kitchen.cabinet.cups 12
12
> PUT kitchen.toaster.brand = "KitchenAid"
"KitchenAid"
> PUT kitchen.cabinet.plates = 7
7
GET kitchen.*
{ drawers: 8, cabinet: { cups: 12, plates: 7 }, toaster: { brand: "KitchenAid" }}

By using a "." character to separate key segments, you can then query multiple keys at once and receive them as a structured JSON object. Not sure if this feature will be useful to anyone, but it was fun to implement.

The challenge with implementing storage this way was ensuring that it was performant. A naive approach would be to iterate over all keys, building a JSON object out of ones that start with "kitchen." This approach would be O(k) time complexity, where k represents all keys in the database. My goal was to make the search O(n), where n represents the number of keys that start with the requested prefix.

There are probably a number of ways you could think to do this - but ultimately I settled on using a Radix Tree data structure.

#[derive(Serialize)]
pub struct RadixNode {
    pub children: HashMap<String, RadixNode>,
    pub key: String,
}

pub struct RadixTree {
    root: RadixNode,
    map: HashMap<String, Value>,
}

This is essentially a Trie, where each node in the tree has a HashMap of all its children. Tries are generally implemented for autocomplete, with each node representing a single character. In a radix tree, each node instead represents multiple characters - in this case, the segments of a key. The reason I reached for this data structure over others is because it also allows faster lookup for deeper wildcard requests. For example, if the query was `GET kitchen.cabinet.topshelf.*`, using something like a simple list of keys would not help to quickly retrieve all the desired nested keys.

I did still ultimately find it easier to store the values in a HashMap separate from the tree - so retrieving a full or partial (limited by depth using something like `GET kitchen.*1`) subtree entails iterating through the tree and retrieving each value from the separate map.

    pub fn get(&self, key: &str) -> Result<String, TransactionError> {
        let access_type = AccessType::parse(key);

        match access_type {
            AccessType::Direct => match self.map.get(key) {
                Some(value) => Ok(value.to_string()),
                None => Err(TransactionError::KeyNotFound(key.to_string())),
            },
            AccessType::FullSubtree(key) => {
                let mut current = &self.root;
                let parts = key.split(".");
                for part in parts {
                    let child = current.children.get(part);
                    match child {
                        Some(child) => current = child,
                        None => return Err(TransactionError::KeyNotFound(key.to_string())),
                    }
                }

                self.serialize_subtree(current, 0)
                    .to_string()
                    .map_err(|_| TransactionError::SerializationFailure)
            }
            AccessType::PartialSubtree(key, depth) => {
                let mut current = &self.root;
                let parts = key.split(".");
                for part in parts {
                    let child = current.children.get(part);
                    match child {
                        Some(child) => current = child,
                        None => return Err(TransactionError::KeyNotFound(key.to_string())),
                    }
                }

                self.serialize_subtree(current, depth)
                    .to_string()
                    .map_err(|_| TransactionError::SerializationFailure)
            }
        }
    }

Key Expiration (TTL)

Another feature I decided to implement later was the ability for the user to set expirations for keys. This is often useful when using key-value stores for caching, as you generally want to invalidate the cached value after a certain period of time, generally referred to as time to live (TTL).

> PUT mykey "my value"
"my value"
> EXPIRE mykey 1000
OK

Under the hood this was actually an interesting problem to implement. I could, of course, start up subroutines for every expiration - but what if there are many? What if they are set to last a long time? Having multiple long running subroutines did not seem like the most efficient way to handle the problem.

So I ended up reaching for one of my favorite data structures - the heap! By using a heap, I could guarantee that the value popped off the top of the heap was always the lowest, so by using the timestamp of expiration as the sort key, I could pop values off the heap until the top of the heap has a timestamp in the future. I could do this at regular intervals (I went with 5 seconds) from a worker thread, and whenever a GET or DELETE request was made by the user.

let worker_kvmap = Arc::clone(&kvmap);
let expiration_worker = move || {
    let mut kvmap = worker_kvmap.lock().unwrap();
    kvmap.process_expirations().unwrap_or(());
};
let expiration_worker = Worker::new(5000, expiration_worker);
let expiration_worker_thread = expiration_worker.start();

There is one additional challenge with processing expirations this way though. What happens if the user updates the expiration later? Imagine the user initially sets a 5-second expiration, and then 2 seconds later updates it to 10 seconds? With the heap solution, the 5-second expiration would still be processed. You cannot just reach into the middle of heap and pull stuff out - that would both be inefficient and would potentially mess up the heap ordering. To solve this, I paired the heap with another HashMap that keeps track of the most recent data, and gave each data object stored in the heap a boolean attribute "valid" that could be set to false and ignored when it was popped off the heap.

Here is the HeapData struct which contains the expiration, key, and valid boolean. I implemented ordering and equality for this struct so that it could be properly sorted.

#[derive(Debug)]
pub struct HeapData<T> {
    key: String,
    data: T,
    valid: Mutex<bool>,
}

impl<T> PartialEq for HeapData<T>
where
    T: PartialEq,
{
    fn eq(&self, other: &Self) -> bool {
        self.data.eq(&other.data) && self.key.eq(&other.key)
    }
}

impl<T> PartialOrd for HeapData<T>
where
    T: Ord + PartialOrd,
{
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}

So when new data is pushed to the heap, and a value already exists for that key, the old data is marked as invalid.

  if let Some(old_heap_data) = self.map.get(&key) {
      *old_heap_data.valid.lock().unwrap() = false;
  }

And when data is popped off the heap, data is continuously popped and ignored until the first "valid" data is retrieved.

while let Some(heap_data) = self.heap.pop() {
    if *heap_data.valid.lock().unwrap() {
        self.map.remove(&heap_data.key);
        return Some(heap_data.data.clone());
    }
}

From there I could implement the data structure, called an "AtomicHeap", as it ensures that only one valid value for any given key can exist (for all intents and purposes) in the heap at a time.

pub struct AtomicHeap<T>
where
    T: Ord + Clone,
{
    heap: BinaryHeap<Arc<HeapData<T>>>,
    map: HashMap<String, Arc<HeapData<T>>>,
}

impl<T> AtomicHeap<T>
where
    T: Ord + Clone,
{
    pub fn push(&mut self, key: String, data: T) {
        let heap_data = Arc::new(HeapData::new(key.clone(), data));

        if let Some(old_heap_data) = self.map.get(&key) {
            *old_heap_data.valid.lock().unwrap() = false;
        }

        self.heap.push(heap_data.clone());
        self.map.insert(key, heap_data);
    }

    pub fn peek(&mut self) -> Option<T> {
        while let Some(heap_data) = self.heap.peek().cloned() {
            if *heap_data.valid.lock().unwrap() {
                return Some(heap_data.data.clone());
            } else {
                self.heap.pop();
            }
        }

        None
    }

    pub fn pop(&mut self) -> Option<T> {
        while let Some(heap_data) = self.heap.pop() {
            if *heap_data.valid.lock().unwrap() {
                self.map.remove(&heap_data.key);
                return Some(heap_data.data.clone());
            }
        }

        None
    }

    pub fn invalidate(&mut self, key: &str) {
        if let Some(heap_data) = self.map.get(key) {
            *heap_data.valid.lock().unwrap() = false;
        }

        self.map.remove(key);
    }
}

I realize that this solution is not perfectly efficient, as in the worst case scenario, calls to pop() can be O(n) if every single value except the last is invalidated. However, in most cases, I think it will prove efficient enough for most use cases.

Write-Ahead Log Persistence

In typical cases where in-memory key-value stores are shut down and restarted, the data is typically lost. By definition, in-memory means the data is kept in the computers main memory and does not persist. For MycoKV, the data can actually be persisted through the use of its write-ahead log.

Whenever a write command is applied to the database, it is first copied as a line in the log. This ensures consistency if the database were to crash before fully writing the data to memory - hence the term "write-ahead." On restart, the log is replayed from the beginning, rebuilding the database one command at a time.

pub fn write(&mut self, operation: &Operation) -> Result<(), TransactionError> {
    let output = match operation {
        // Ignore get operations since they have no affect on db state
        Operation::Get(_) => return Ok(()),

        Operation::Put(key, value) => format!("PUT {} {}\n", key, value.to_string()),
        Operation::Delete(key) => format!("DELETE {}\n", key),
        Operation::Purge => {
            self.clear()?;
            String::from("")
        }
        Operation::Time => return Ok(()),
        Operation::ExpireAt(expiration) => {
            format!("EXPIREAT {} {}\n", expiration.key, expiration.timestamp)
        }
    };

    self.file
        .write_all(output.as_bytes())
        .map_err(|error| TransactionError::LogWriteFail(error.to_string()))?;

    Ok(())
}

As you can guess, the main issue with this comes when the write-ahead log becomes very large. Replaying every command ever issued against the database can take a considerable amount of time. As a result, the PURGE command and `--purge` command line argument can be used to effectively disable this feature entirely.

I do have plans in the future to implement some sort of truncation functionality with the write-ahead log. Perhaps something like a HashMap for each key that keeps track of the last line in the log for that key, and deletes that line when new commands are issued - similar to what I did in creating the AtomicHeap.

Node.js Driver

Implementing the Node.js library for using MycoKV was actually a lot of fun. It was rewarding to be building out features in an entirely different context to support my own creation. I learned a lot - and actually ended up creating tickets on MycoKV for features that I wanted support for in the driver implementation.

My main focus in building the library was developer experience. I asked myself, "How would I like to use this library?" I wrote out some code that I thought a user might write, and then built my implementation to support that usage. I also took some extra steps for good DX - including inline documentation and writing up a detailed README.md that would appear on the NPM page for the package.

import { MycoKV } from "mycokv-node";

const myco = await MycoKV.connect({
    host: "localhost",
    port: 6922,
});

try {
    await myco.put("foo", "bar");
    const get = await myco.get("foo");
    console.log(get); // "bar"

    await myco.delete("foo");
} catch (error) {
    // Handle any errors
}

await myco.disconnect();

Nothing was particularly challenging regarding building this library, but I did spend a little more time focusing on CI/CD. My goal, that I ultimately achieved, was to completely automate the process of publishing the package to NPM. Upon pushing a new tag to the repo (such as v0.1.0), my Github Actions workflow builds and tests the package, extracts the version number from the tag, updates the package.json with the new version number, checks out the main branch and pushes the updated package.json, and then publishes the package to NPM.

name: Publish Package

on:
    push:
        tags:
            - "*"

jobs:
    publish-npm:
        runs-on: ubuntu-latest
        services:
            mycokv:
                image: wvaviator/mycokv:latest
                ports:
                    - 6922:6922
        steps:
            - uses: actions/checkout@v3

            - name: Set up Node.js
              uses: actions/setup-node@v3
              with:
                  node-version: "16.x"
                  registry-url: "https://registry.npmjs.org/"

            - name: Install Dependencies
              run: yarn install

            - name: Build and Test
              run: |
                  yarn build
                  yarn test

            - name: Extract Version from Tag
              run: |
                  TAG_VERSION=${GITHUB_REF#refs/tags/v}
                  echo "Extracted version: $TAG_VERSION"
                  jq '.version="'$TAG_VERSION'"' package.json > temp.json && mv temp.json package.json
                  echo "Updated package.json version to $TAG_VERSION"

            - name: Commmit package.json to Main Branch
              run: |
                  git config user.name 'github-actions[bot]'
                  git config user.email 'github-actions[bot]@users.noreply.github.com'
                  git fetch origin main:main
                  git checkout main
                  git add package.json
                  git commit -m "bump package version [skip ci]"
                  git push origin main
                  echo "Committed updated package.json file to active branch"

            - name: Publish to NPM
              run: yarn publish --non-interactive
              env:
                  NODE_AUTH_TOKEN: ${{ secrets.NPM_TOKEN }}

Conclusion

I do intend to continue developing and adding features. In addition to the write-ahead log modifications I mentioned before, I would also like to build drivers for other languages such as Java, Python, etc., which would be a great opportunity to learn how libraries are built in published in those ecosystems.

I'd also like to add more web security to the database as well. Right now, it is not really fit for use in any kind of production application. Since its network communication is just simple text, anyone could submit commands to the database. Adding some sort of asymmetric encryption, along with authentication and authorization, would be a fun and interesting challenge.

The last thing that I would eventually like to add is a web application for managing the database. Perhaps even a website where a user can pay for and provision an instance of MycoKV in the cloud. I suppose that might be a bit out of the realm of possibility for a solo developer such as myself, but it could still be fun nonetheless.