A tech & domain blog powered by Shtanglitza
Vector search is a common requirement for AI applications, enabling features like recommendation engines and semantic search. However, building a scalable, real-time vector search system often involves integrating multiple distinct technologies: a message queue for ingestion, various databases for indexing and storage, and a compute layer for processing.
As part of evaluating Rama as a development platform for our product solution, which incorporates several data models to capture different stages in biotech lab experiment design and execution, we decided to try building a vector search system from scratch.
While modern vector search often relies on complex graph-based algorithms like HNSW, this post explores a different approach: implementing the classic Locality-Sensitive Hashing (LSH) algorithm. LSH is a great starting point for an experiment like this because its principles - hashing, bucketing, and re-ranking - map clearly to data processing primitives. The goal is to see how Rama's unified model handles the components of this traditionally complex task.
Locality-Sensitive Hashing is an algorithm for approximate nearest-neighbor search. Instead of comparing a query vector to every other vector in a dataset (which is slow), LSH uses a special hashing technique. The core idea is that similar vectors are "hashed" into the same "bucket" with high probability. A query then only needs to be compared against the vectors in its own bucket and a few nearby ones, drastically reducing the search space. This is typically followed by a re-ranking step that computes the exact distance for this smaller set of candidates.
(For a deeper dive, see: Pinecone article)
Rama is a platform for building distributed backends. It integrates data storage and computation into a single system, aiming to reduce the "glue code" connecting separate services.
Its core components are:
(For more information, see the Rama Documentation)
A Quick Note: Before diving in, a disclaimer. This code comes from a developer with very little Rama experience. It's an experiment, not a production-ready, heavily-optimized solution. The code is probably very far from perfect, but it's a way to learn the platform.
The implementation consists of one Rama module that handles ingestion, indexing, and querying.
The module relies on two PStates, Rama's durable data stores, to hold the data.
String) to its full data. It's defined as a fixed-keys-schema, which is ideal for object-like structures where the keys (:vec, :norm, :body) are known in advance.set-schema of vector IDs. This set is declared with {:subindex? true}, a key Rama feature. It tells Rama to index the elements of the set individually, which is critical for performance when a single bucket grows to contain thousands or millions of IDs. (declare-pstate etl
$$embeddings
{String (fixed-keys-schema
{:vec (vector-schema Double)
:norm Double
:body String})})
(declare-pstate etl
$$lsh-tables
{String (set-schema String {:subindex? true})})
The code uses several hardcoded parameters. Here's a brief explanation of why:
dims = 384: This is dictated by the embedding model. The dataset used in testing is based on all-MiniLM-L6-v2, which produces 384-dimension vectors.nbits = 8 & num-tables = 4: This is the core LSH performance/accuracy trade-off. More tables (num-tables) increase recall (higher chance of finding a true neighbor) at the cost of more lookups. More bits (nbits) create more (and smaller) buckets per table (28 = 256). For a dataset in the 10k-100k vector range, this configuration (4 tables, 256 buckets each) is a reasonable starting point to keep buckets from getting too large, while the multiple tables provide a good chance of finding true neighbors.top-k = 50: This is simply the application requirement—how many results to return.<<sourcesA streaming topology (stream-topology) listens to the *ingest depot for new vectors. Its dataflow defines the real-time indexing logic.
(defmodule LSHModule
[setup topologies]
(declare-depot setup *ingest (hash-by :id))
(let [etl (stream-topology topologies "load")]
;; ... PState declarations ...
(<<sources etl
(source> *ingest :> {:keys [*id *vec *body]})
;; 1. Save the full vector to the embeddings PState.
(vector/norm *vec :> *norm)
(local-transform> [(keypath *id)
(termval {:vec *vec
:norm *norm
:body *body})]
$$embeddings)
;; 2. Get the deterministic hyperplanes.
(get-hyperplane-sets :> *all-planes)
;; 3. Hash the vector against all tables to get bucket IDs.
(ops/explode *all-planes :> *plane)
(vector/vector->bucket-id *vec *plane :> *bucket-str)
;; 4. Partition to the task responsible for this bucket.
(|hash *bucket-str)
;; 5. Add the vector's ID to the bucket in the LSH table.
(local-transform> [(keypath *bucket-str)
NONE-ELEM
(termval *id)]
$$lsh-tables))))
The query is a multi-stage operation. To keep the code efficient, it's split into two phases.
A defgenerator (a reusable sub-batch) is defined to handle the "approximate" search. It scatters the query to all LSH buckets, gathers all potential candidates, and deduplicates them.
(defgenerator gather-unique-ids
[query-vec]
(batch<- [*unique-id]
;; 1. Hash the query vector against all tables and
;; probe nearby buckets.
(get-hyperplane-sets :> *all-planes)
(ops/explode *all-planes :> *plane)
(vector/vector->bucket-id query-vec *plane :> *bucket-str)
(vector/nearby-buckets *bucket-str 2 :> *nbr-strs)
(set/union #{*bucket-str} *nbr-strs :> *probe-buckets)
;; 2. Scatter: For every bucket, fetch the set of IDs.
(ops/explode *probe-buckets :> *bucket-str2)
(|hash *bucket-str2)
(local-select> (keypath *bucket-str2) $$lsh-tables :> *ids-in-bucket)
(ops/explode *ids-in-bucket :> *cand-id)
;; 3. Gather & Deduplicate: Aggregate all IDs into one unique set.
(|global)
(aggs/+set-agg *cand-id :> *all-unique-cands)
;; 4. Emit a stream of *unique* IDs.
(ops/explode *all-unique-cands :> *unique-id)))
This is the main query API. It calls the generator from Phase 1, then performs the exact distance calculation only on the unique candidates.
(<<query-topology topologies "top-vec-search"
[*query-vec :> *top-results]
;; 1. Call Phase 1. This emits a stream of *unique* IDs.
(gather-unique-ids *query-vec :> *unique-id)
;; 2. Re-rank: Fetch the full vector for each unique ID.
(|hash *unique-id)
(local-select> (keypath *unique-id) $$embeddings
:> {:keys [*vec *norm *body]})
;; 3. Calculate the *true* distance.
(vector/norm *query-vec :> *q-norm)
(vector/cosine-distance *query-vec *vec *q-norm *norm :> *distance)
(vector *unique-id *distance *body :> *tuple)
;; 4. Aggregate: Gather tuples and find the true top 50.
(|origin)
(aggs/+top-monotonic
[top-k] ;; Must be a constant
*tuple
:+options {:id-fn first
:sort-val-fn second
:sort-type :ascending}
:> *top-results))
For local development and unit testing, Rama provides an InProcessCluster (IPC). This simulates a full cluster in-process, making it easy to test the complete dataflow.
The following REPL test case demonstrates the workflow. The dataset used here is a sample of Wikipedia articles, which can be downloaded here.
(require '[com.rpl.rama.test :as rtest])
(require '[rama-vector-store.utils :as utils]) ;; Assumed helper ns
(with-open [ipc (rtest/create-ipc)]
;; 1. Launch the module in the local cluster
(rtest/launch-module! ipc LSHModule {:tasks 4 :threads 2})
(let [;; 2. Load dataset
dataset (take 100 (utils/wiki-minilm-dataset))
module-name (get-module-name LSHModule)
;; 3. Get clients to interact with the module
depot (foreign-depot ipc module-name "*ingest")
query (foreign-query ipc module-name "top-vec-search")
;; 4. Select the first vector as our query
query-vec (:all-MiniLM-L6-v2 (first dataset))
query-text (:body (first dataset))]
;; 5. Ingest data by appending to the depot
(doseq [{:keys [id all-MiniLM-L6-v2 body]} dataset]
(foreign-append! depot (->Embedding id all-MiniLM-L6-v2 body)))
(println "Querying for vector 0:\n" (subs query-text 0 70) "...")
;; 6. Invoke the query topology
(foreign-invoke-query query query-vec)))
Running this test queries the system using the first vector from the dataset ("Title: Fresnes, Yonne..."). The query topology returns a list of tuples, each containing [id, distance, body], sorted by distance.
A snippet of the results shows the query vector itself as the first result, with a distance of nearly zero, as expected.
Querying for vector 0:
Title: Fresnes, Yonne Content: Fresnes is a commune. It is found i...
=>
[["0" -2.22...E-16 "Title: Fresnes, Yonne Content: Fresnes is a commune..."]
["19" 0.701... "Title: Saint-Palais, Allier Content: Saint-Palais is a commune..."]
["60" 0.708... "Title: Rocles, Ardèche Content: Rocles is a commune..."]
["96" 0.753... "Title: Pyrénées-Orientales Content: Pyrénées-Orientales..."]
["78" 0.773... "Title: Saint-André-de-Cubzac Content: Saint-André-de-Cubzac..."]
...]
The InProcessCluster is ideal for this kind of integration testing. For full-scale performance testing, one would deploy the module to a production cluster. Interaction with a deployed cluster uses the same foreign- client APIs or Rama's built-in REST API. (A two-node cluster is available for free from Red Planet Labs).
This code is the result of trial and error. The biggest hurdle was figuring out the correct way to handle the large, static hyperplane-sets vector.
The first instinct might be to use declare-object, as it's the idiomatic Rama way to make data available to all tasks. An attempt might look like this:
;; This fails!
(def hyperplane-sets (vec (repeatedly ...)))
(defmodule LSHModule [setup topologies]
(declare-object setup *hyperplanes hyperplane-sets)
...)
This fails instantly with a Class file too large! error. It seems that when Rama (or Clojure) compiles the module, it resolves hyperplane-sets to its literal value and tries to inline that entire massive vector (all 12,288 numbers) directly into the module's compiled .class file. This blows past the JVM's 65KB method size limit.
A similar error occurs if one tries to use the global def directly inside a dataflow ops/explode:
;; This also fails to compile/load
(<<sources etl
...
(ops/explode hyperplane-sets :> *plane) ;; Fails
...)
The dataflow compiler tries to do the same thing: inline the huge vector as a constant, which again breaks the method size limit.
The solution that did work was to add a layer of indirection. By defining hyperplane-sets in a defonce (which uses a fixed-seed Random) and then accessing it only through a simple getter function, (defn get-hyperplane-sets []), the compiler's behavior changes.
;; This works!
(defn get-hyperplane-sets []
hyperplane-sets)
(<<sources etl
...
(get-hyperplane-sets :> *all-planes) ;; Works
(ops/explode *all-planes :> *plane)
...)
The dataflow compiler now just embeds a small function call (get-hyperplane-sets). The massive vector is then loaded at runtime via the var, successfully avoiding the compile-time error.
This experiment showed that Rama's model can express this entire system in a single module.
$$embeddings) and a key-to-set "lookup index" ($$lsh-tables).defgenerator primitive was useful for optimizing this query by separating candidate generation from re-ranking.|hash in both the ETL and query topologies defines how the work is distributed.The next step in this evaluation will be to run proper performance benchmarks against this implementation to see how it scales under load. We plan to post those results in a future article.
Here is the complete lsh-module.clj file:
(ns rama-vector-store.lsh-module
(:use [com.rpl.rama]
[com.rpl.rama.path])
(:require [clojure.set :as set]
[com.rpl.rama.aggs :as aggs]
[com.rpl.rama.ops :as ops]
;; Namespace with helper functions (dot-product, cosine-distance, ...)
[rama-vector-store.vector-utils :as vector])
(:import (java.util Random)))
(defrecord Embedding [id vec body])
(def nbits 8)
(def num-tables 4)
(def dims 384)
(def top-k 50)
(defonce hyperplane-sets
(let [rand (Random. 12345)] ;; Fixed seed for reproducibility
(vec (repeatedly num-tables
(fn []
(vec (repeatedly nbits
(fn []
(vec (repeatedly dims (fn [] (- (.nextDouble rand) 0.5))))))))))))
(defn get-hyperplane-sets []
hyperplane-sets)
(defgenerator gather-unique-ids
[query-vec]
(batch<- [*unique-id] ; This is the output variable of the generator
(get-hyperplane-sets :> *all-planes)
(ops/explode *all-planes :> *plane)
(vector/vector->bucket-id query-vec *plane :> *bucket-str)
(vector/nearby-buckets *bucket-str 2 :> *nbr-strs)
(set/union #{*bucket-str} *nbr-strs :> *probe-buckets)
(ops/explode *probe-buckets :> *bucket-str2)
(|hash *bucket-str2)
(local-select> (keypath *bucket-str2) $$lsh-tables :> *ids-in-bucket)
(ops/explode *ids-in-bucket :> *cand-id)
(|global)
(aggs/+set-agg *cand-id :> *all-unique-cands) ; Aggregates all IDs into one unique set
(ops/explode *all-unique-cands :> *unique-id) ; Emits each unique ID
))
(defmodule LSHModule
[setup topologies]
(declare-depot setup *ingest (hash-by :id))
(let [etl (stream-topology topologies "load")]
(declare-pstate etl
$$embeddings
{String (fixed-keys-schema
{:vec (vector-schema Double)
:norm Double
:body String})})
(declare-pstate etl
$$lsh-tables
{String (set-schema String {:subindex? true})})
(<<sources etl
(source> *ingest :> {:keys [*id *vec *body]})
(vector/norm *vec :> *norm)
(local-transform> [(keypath *id)
(termval {:vec *vec
:norm *norm
:body *body})]
$$embeddings)
(get-hyperplane-sets :> *all-planes)
(ops/explode *all-planes :> *plane)
(vector/vector->bucket-id *vec *plane :> *bucket-str)
(|hash *bucket-str)
(local-transform> [(keypath *bucket-str)
NONE-ELEM
(termval *id)]
$$lsh-tables)))
(<<query-topology topologies "top-vec-search"
[*query-vec :> *top-results]
;; Call the generator to get a stream of *unique* IDs
(gather-unique-ids *query-vec :> *unique-id)
;; Fetch embeddings for each *unique* candidate
(|hash *unique-id)
(local-select> (keypath *unique-id) $$embeddings
:> {:keys [*vec *norm *body]})
;; Distance calculation
(vector/norm *query-vec :> *q-norm)
(vector/cosine-distance *query-vec *vec *q-norm *norm :> *distance)
;; Construct tuple
(vector *unique-id *distance *body :> *tuple)
;; Aggregate top results by distance
(|origin)
(aggs/+top-monotonic
[top-k]
*tuple
:+options {:id-fn first
:sort-val-fn second
:sort-type :ascending}
:> *top-results))
)
Published: 2025-11-07
Written by: Marko Djordjevic