Skip to content

durability-labs/nim-kvstore

Repository files navigation

nim-kvstore

Simple, unified API for multiple key-value stores with optimistic concurrency control.

Inspired by the Python library datastore.

Features

  • Unified API - Same interface across different storage backends
  • Optimistic Concurrency Control - Token-based CAS (Compare-And-Swap) semantics prevent lost updates
  • Typed Records - Automatic serialization/deserialization with custom encoder/decoder procs
  • Async/Await - Built on Chronos for non-blocking async operations
  • True Async I/O - Blocking operations offloaded to threadpool, never blocks the event loop
  • Multiple Backends - SQLite (in-memory or file) and filesystem
  • Atomic Batch Operations - All-or-nothing batch puts/deletes (SQLite backend)

Installation

nimble install kvstore

Quick Start

import pkg/chronos
import pkg/kvstore
import pkg/stew/byteutils
import pkg/taskpools

proc main() {.async.} =
  # Create a threadpool for async I/O
  let tp = Taskpool.new(num_threads = 4)

  # Create an in-memory SQLite kvstore
  let ds = SQLiteKVStore.new(SqliteMemory, tp).tryGet()

  # Create a key
  let key = Key.init("/users/alice").tryGet()

  # Store data (token=0 means insert-only)
  (await ds.put(key, "Hello, World!".toBytes())).tryGet()

  # Retrieve data
  let record = (await ds.get(key)).tryGet()
  echo "Value: ", string.fromBytes(record.val)
  echo "Token: ", record.token  # Version token for CAS

  # Update with CAS - use the token from the previous get
  let updated = RawKVRecord.init(key, "Updated!".toBytes(), record.token)
  (await ds.put(updated)).tryGet()

  # Close the store
  (await ds.close()).tryGet()

  # Shutdown threadpool
  tp.shutdown()

waitFor main()

Note: Compile with --threads:on (required for threadpool support).

Error Handling

nim-kvstore uses the questionable library for type-safe error handling. All fallible operations return ?!T (Result type) instead of raising exceptions.

Result Type (?!T)

import pkg/questionable/results

# ?!T is either a success value or a failure with error
proc getValue(): ?!int =
  if condition:
    success(42)        # Return success with value
  else:
    failure("error")   # Return failure with message

# ?!void for operations that don't return a value
proc doSomething(): ?!void =
  if failed:
    return failure(newException(IOError, "write failed"))
  success()

Unwrapping Results

? operator - Early return on error (idiomatic, use this):

proc process(): ?!int =
  let a = ?getValue()      # Returns failure if getValue fails
  let b = ?transform(a)    # Returns failure if transform fails
  success(b)

proc processAsync(): Future[?!RawKVRecord] {.async.} =
  let record = ?(await ds.get(key))  # Works with await too
  success(record)

.tryGet() - Unwrap or raise exception (acceptable in tests/scripts):

# In tests - exceptions are fine
let value = getValue().tryGet()

# Quick Start examples use .tryGet() for brevity, but prefer ? in production

without pattern - Handle errors with custom logic:

without value =? getValue(), err:
  echo "Failed: ", err.msg
  return failure(err)
# use value here

.isOk / .isErr - Check without unwrapping:

let result = getValue()
if result.isOk:
  echo "Got: ", result.get()
else:
  echo "Error: ", result.error.msg

Idiomatic Patterns

# Chain operations with ? - all errors propagate automatically
proc updateUser(id: string, name: string): Future[?!void] {.async.} =
  let
    key = ?Key.init("/users/" & id)
    record = ?(await ds.get(key))
    updated = RawKVRecord.init(record.key, name.toBytes(), record.token)
  discard ?(await ds.put(updated))
  success()

# Collect results, propagate first error
proc getAll(keys: seq[Key]): Future[?!seq[RawKVRecord]] {.async.} =
  var results: seq[RawKVRecord]
  for key in keys:
    results.add(?(await ds.get(key)))
  success(results)

Core Concepts

Keys and Namespaces

Keys are hierarchical paths used to identify records. A Key is composed of one or more Namespace segments.

Namespace

A Namespace is a single segment of a key, consisting of an optional field and a value:

type Namespace* = object
  field*: string   # Optional field/type identifier
  value*: string   # The namespace value

Constants:

  • Delimiter = ":" - Separates field from value within a namespace
  • Separator = "/" - Separates namespaces within a key

Constructors:

Signature Description
Namespace.init(field, value: string): ?!Namespace Create from separate field and value
Namespace.init(id: string): ?!Namespace Parse from string like "field:value" or "value"

Validation Rules:

  • Neither field nor value may contain ":" or "/"
  • An id string may contain at most one ":"
  • Whitespace is stripped from field and value

Functions:

Function Description
id(ns): string Returns "field:value" if field exists, else "value"
hash(ns): Hash Hash based on id
$(ns): string Same as id

Key

A Key is a hierarchical path composed of Namespace segments:

type Key* = object
  namespaces*: seq[Namespace]

Constructors:

Signature Description
Key.init(namespaces: varargs[Namespace]): ?!Key Create from Namespace objects
Key.init(namespaces: varargs[string]): ?!Key Parse from path strings like "/a:b/c/d"
Key.init(keys: varargs[Key]): ?!Key Concatenate multiple keys

Parsing Behavior:

  • Strings are split by "/" separator
  • Empty segments (e.g., "///a///b///") are filtered out
  • Each segment is parsed as a Namespace

Accessors:

Function Description
list(key): seq[Namespace] Returns all namespaces
key[x] Index into namespaces (supports slices)
len(key): int Number of namespaces
value(key): string Value of the last namespace
field(key): string Field of the last namespace
id(key): string Full path string, e.g., "/a:b/c/d:e"

Navigation:

Function Description
root(key): bool True if key has only one namespace
parent(key): ?!Key Key without last namespace (fails if root)
path(key): ?!Key Parent with last namespace's field/value stripped
reverse(key): Key Key with namespaces in reverse order

Building Keys:

Function Description
child(key, namespaces: varargs[Namespace]): Key Append namespaces
child(key, keys: varargs[Key]): Key Append keys
child(key, ids: varargs[string]): ?!Key Append parsed strings
key / ns Operator alias for child (Namespace)
key / other Operator alias for child (Key)
key / id Operator alias for child (string)
Key.random(): string Generate random 24-char OID string

Relationships:

Function Description
relative(key, parent): ?!Key Get key relative to parent
ancestor(key, other): bool True if other is a descendant of key
descendant(key, other): bool True if key is a descendant of other

Example Usage:

# Create namespaces
let ns = Namespace.init("type", "user").tryGet()  # field="type", value="user"
let ns2 = Namespace.init("user").tryGet()          # field="", value="user"
let ns3 = Namespace.init("type:user").tryGet()     # field="type", value="user"

# Create keys
let key = Key.init("/users/alice/profile").tryGet()
let key2 = Key.init("users", "alice", "profile").tryGet()  # equivalent

# Navigate keys
let parent = key.parent.tryGet()        # /users/alice
let isRoot = parent.root                # false
let lastValue = key.value               # "profile"

# Build keys
let child = (key / "settings").tryGet()  # /users/alice/profile/settings
let combined = key / Key.init("a/b").tryGet()

# Check relationships
let isAncestor = Key.init("/users").tryGet().ancestor(key)  # true
let relative = key.relative(Key.init("/users").tryGet()).tryGet()  # alice/profile

Records and Tokens

Every record in nim-kvstore has three components:

type KVRecord*[T] = object
  key*: Key        # Unique identifier
  val*: T          # The stored value
  token*: uint64   # Version token for optimistic concurrency

The token is central to the CAS semantics:

  • Token 0 means "insert only if key doesn't exist"
  • Any other token means "update only if current token matches"

Important: Tokens are opaque values and should not be manipulated directly. Always use the token returned from get() operations when performing updates or deletes. Different backends may implement token generation differently (incrementing integers, timestamps, UUIDs, etc.), so never assume a specific token format or attempt arithmetic on tokens.

Optimistic Concurrency Control

nim-kvstore uses optimistic concurrency to prevent lost updates in concurrent environments:

# Two clients read the same record
let record1 = (await ds.get(key)).tryGet()  # token = 5
let record2 = (await ds.get(key)).tryGet()  # token = 5

# Client 1 updates successfully
let update1 = RawKVRecord.init(key, newValue1, record1.token)
(await ds.put(update1)).tryGet()  # Success! Token is now 6

# Client 2's update fails - stale token (single-record put raises error)
let update2 = RawKVRecord.init(key, newValue2, record2.token)
let result = await ds.put(update2)
if result.isErr:
  echo "Conflict detected: ", result.error.msg  # KVConflictError

Note: Single-record operations (put(record), delete(record)) return errors on conflict. Batch operations (put(@[records]), delete(@[records])) return a list of skipped keys instead.

Bulk Operations

Bulk operations return a list of keys that were skipped due to conflicts:

let records = @[
  RawKVRecord.init(key1, value1, token1),
  RawKVRecord.init(key2, value2, token2),
  RawKVRecord.init(key3, value3, token3),
]

let skipped = (await ds.put(records)).tryGet()
# skipped contains keys where token didn't match

API Reference

Core Operations

Method Description
has(key) Check if key exists
get(key) Get single record as RawKVRecord (raw bytes)
get(key, T) Get single record as KVRecord[T] (auto-decode)
get(keys) Get multiple records as seq[RawKVRecord]
get(keys, T) Get multiple records as seq[KVRecord[T]]
put(record) Insert/update single record (errors on conflict)
put(records) Insert/update multiple records (returns skipped keys)
put(key, val) Convenience: insert raw bytes or typed value at key
delete(record) Delete single record (errors on conflict)
delete(records) Delete multiple records (returns skipped keys)
query(query) Query records by key prefix as QueryIter[RawKVRecord]
query(query, T) Query records by key prefix as QueryIter[T]
close() Close the store

Atomic Batch Operations

For backends that support it (SQLite), atomic operations provide all-or-nothing semantics:

Method Description
supportsAtomicBatch() Check if backend supports atomic batches
putAtomic(records) Insert/update all records atomically (rolls back on any conflict)
deleteAtomic(records) Delete all records atomically (rolls back on any conflict)
# Check if atomic operations are supported
if ds.supportsAtomicBatch():
  # All succeed or all fail
  let conflicts = (await ds.putAtomic(records)).tryGet()
  if conflicts.len > 0:
    echo "Atomic batch failed due to conflicts: ", conflicts
    # No records were written

Helper Operations

Method Description
tryPut(records, maxRetries, middleware) Bulk put with retry on conflicts
tryDelete(records, maxRetries, middleware) Bulk delete with retry on conflicts
tryPutAtomic(records, maxRetries, middleware) Atomic put with retry on conflicts
tryDeleteAtomic(records, maxRetries, middleware) Atomic delete with retry on conflicts
getOrPut(key, producer, maxRetries) Get existing or lazily create
fetchAll(iter) Collect all iterator results into a seq

Middleware for Conflict Resolution

The tryPut and tryDelete helpers accept a middleware function to resolve conflicts. Pass nil for no middleware (conflicts will be retried with the same records).

import std/sequtils

# Middleware receives failed records and returns updated records to retry
let middleware = proc(failed: seq[RawKVRecord]): Future[?!seq[RawKVRecord]]
    {.async: (raises: [CancelledError]).} =
  # Refetch current tokens
  let fresh = (await ds.get(failed.mapIt(it.key))).tryGet()

  # Update records with fresh tokens
  var updated: seq[RawKVRecord]
  for record in failed:
    for f in fresh:
      if f.key == record.key:
        updated.add(RawKVRecord.init(record.key, record.val, f.token))
        break

  success(updated)

let result = await ds.tryPut(records, maxRetries = 3, middleware = middleware)

# Or without middleware (nil):
let result2 = await ds.tryPut(records, maxRetries = 3, middleware = nil)

Typed Records

nim-kvstore supports automatic type conversion with custom encoder/decoder procs:

import std/strutils
import pkg/stew/byteutils
import pkg/questionable/results

type Person = object
  name: string
  age: int

# Define encoder
proc encode(p: Person): seq[byte] =
  (p.name & ":" & $p.age).toBytes()

# Define decoder
proc decode(T: type Person, bytes: seq[byte]): ?!T =
  let parts = string.fromBytes(bytes).split(':')
  success(Person(name: parts[0], age: parseInt(parts[1])))

# Use typed API
let key = Key.init("/people/alice").tryGet()
let person = Person(name: "Alice", age: 30)

# Store typed record
(await ds.put(key, person)).tryGet()

# Retrieve typed record - pass type as second argument
let record = (await ds.get(key, Person)).tryGet()
echo record.val.name  # "Alice"
echo record.val.age   # 30

# Query with typed results
let query = Query.init(Key.init("/people").tryGet())
let iter = (await ds.query(query, Person)).tryGet()
while not iter.finished:
  let recordOpt = (await iter.next()).tryGet()
  if record =? recordOpt:
    echo record.val.name
discard await iter.dispose()

Type Inference

The type parameter defaults to seq[byte] (raw bytes):

# These are equivalent - both return RawKVRecord
let raw1 = (await ds.get(key)).tryGet()
let raw2 = (await ds.get(key, seq[byte])).tryGet()

# For typed records, pass the type explicitly
let typed = (await ds.get(key, Person)).tryGet()

Storage Backends

Both backends require a Taskpool for async I/O operations.

SQLiteKVStore

SQLite-backed storage supporting both in-memory and file-based databases.

import pkg/taskpools

let tp = Taskpool.new(num_threads = 4)

# In-memory database
let memDs = SQLiteKVStore.new(SqliteMemory, tp).tryGet()

# File-based database
let fileDs = SQLiteKVStore.new("/path/to/db.sqlite", tp).tryGet()

# Read-only mode
let readOnlyDs = SQLiteKVStore.new("/path/to/db.sqlite", tp, readOnly = true).tryGet()

Features:

  • Supports atomic batch operations (putAtomic, deleteAtomic)
  • WAL mode with synchronous=NORMAL (safe, ~2x faster than FULL)
  • busy_timeout=5000 (waits for locks instead of failing immediately)
  • Automatic statement finalization and connection cleanup

Note: SQLite uses int64 for tokens, limiting the range to 0..high(int64).

FSKVStore

Filesystem-backed storage where each record is a file.

import pkg/taskpools

let tp = Taskpool.new(num_threads = 4)

let fsDs = FSKVStore.new(
  root = "/path/to/data",
  tp = tp,
  depth = 5  # Maximum key depth
).tryGet()

Features:

  • Atomic file writes (write to temp, then rename)
  • Parent directory fsync for crash safety (POSIX)
  • Per-key locking for write ordering

Limitations:

  • Does not support atomic batch operations (putAtomic, deleteAtomic)
  • Query only supports key prefix filtering and value flag; sort, offset, and limit are ignored (filesystem walk order)
  • Uses uint64 for tokens (full range supported)

Store Lifecycle

Closing a Store

The close() method shuts down the store and releases resources:

(await ds.close()).tryGet()

Close behavior:

  1. Cancels in-flight operations - Outstanding get, put, delete, and iterator operations are cancelled
  2. Releases resources - Database connections, file handles, and locks are released
  3. Idempotent - Calling close() multiple times is safe (subsequent calls return immediately)

After close:

  • All operations (get, put, delete, query, etc.) return a failure
  • The store cannot be reopened - create a new instance instead
# Operations after close fail gracefully
(await ds.close()).tryGet()
let result = await ds.get(key)
assert result.isErr  # Returns failure, doesn't crash

Iterator Lifecycle

Iterators are must-dispose resources. Always call dispose() when done:

let iter = (await ds.query(q)).tryGet()
defer: discard await iter.dispose()  # Always dispose
# ... use iterator ...

Important: close() does not auto-dispose iterators. Failing to call dispose() will leak iterator resources (signals, prepared statements, locks). While close() cancels in-flight operations, undisposed iterators may delay final resource cleanup.

Handling Close Failures

If close() returns an error, resources may have leaked:

if err =? (await ds.close()).errorOption:
  warn "Store close had errors", error = err.msg
  # Resources may be leaked but store is closed

Close can return errors if:

  • Backend resources cannot be released cleanly
  • In-flight operations failed during cancellation

After close() (success or failure), the store cannot be used - create a new instance instead.

Query API

Query records by key prefix:

let query = Query.init(
  key = Key.init("/users").tryGet(),
  value = true,              # Include values in results
  sort = SortOrder.Ascending,
  offset = 0,
  limit = 100                # -1 for unlimited (default)
)

# Raw bytes query (default)
let iter = (await ds.query(query)).tryGet()

# Option 1: Manual iteration
while not iter.finished:
  let recordOpt = (await iter.next()).tryGet()
  if record =? recordOpt:
    echo record.key, ": ", record.val

# Always dispose the iterator (async)
discard await iter.dispose()

# Option 2: Use fetchAll helper
let iter2 = (await ds.query(query)).tryGet()
let records = (await iter2.fetchAll()).tryGet()
discard await iter2.dispose()

Typed Query Results

Pass a type to query() for automatic decoding:

# Query with typed decoding
let iter = (await ds.query(query, Person)).tryGet()
defer: discard await iter.dispose()

while not iter.finished:
  let recordOpt = (await iter.next()).tryGet()
  if record =? recordOpt:
    echo record.val.name, " is ", record.val.age, " years old"

# Or use fetchAll for typed results
let iter2 = (await ds.query(query, Person)).tryGet()
let people = (await iter2.fetchAll()).tryGet()
discard await iter2.dispose()
for p in people:
  echo p.val.name

Important: Iterators are must-dispose resources. Always call dispose() to release backend resources (prepared statements, locks). Failing to dispose before close() may cause close to fail.

Error Types

KVStoreError                  # Base error type
├── KVConflictError           # CAS conflict on single-record operations
├── KVStoreMaxRetriesError    # tryPut/tryDelete exhausted retries
└── KVStoreBackendError       # Backend-specific errors
    ├── KVStoreKeyNotFound    # Key doesn't exist
    └── KVStoreCorruption     # Data corruption detected

Note: Accessing an iterator after it's finished or disposed returns a generic KVStoreError with a descriptive message, not a specialized error type.

Threading

nim-kvstore uses a threadpool to offload blocking I/O operations, ensuring the Chronos event loop is never blocked.

Requirements:

  • Compile with --threads:on
  • Provide a Taskpool when creating stores
  • Uses --gc:orc (default in Nim 2.0+)
import pkg/taskpools

# Create a shared threadpool
let tp = Taskpool.new(num_threads = 4)

# Pass to store constructors
let sqliteDs = SQLiteKVStore.new(SqliteMemory, tp).tryGet()
let fsDs = FSKVStore.new(root = "/data", tp = tp).tryGet()

# Shutdown when done
tp.shutdown()

Development

Formatting

Code is formatted with nph:

nimble format

Testing

nimble test

Stability

nim-kvstore is currently marked as experimental and may be subject to breaking changes across any version bump until it is marked as stable.

License

nim-kvstore is licensed and distributed under either of:

at your option. The contents of this repository may not be copied, modified, or distributed except according to those terms.

About

Simple, unified API for multiple data stores with optimistic concurrency control

Resources

License

Apache-2.0, MIT licenses found

Licenses found

Apache-2.0
LICENSE-APACHEv2
MIT
LICENSE-MIT

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages