In vector database operations, we require the collection to be complete with all records loaded before performing individual upsert operations. Without completeness guarantees:
Lock Type | Implementation | Scope | Primary Purpose | Advantage |
---|---|---|---|---|
Local Lock | In-memory atomic flag (isLockedForBatchProcessing ) |
Single application node | Prevents redundant operations on the same node | Low overhead, no network communication required |
Distributed Lock | Managed by VectorDbLockManager using Qdrant |
Across all application nodes | Coordinates operations between multiple application instances | Ensures only one node performs global operations, prevents duplicate initialization |
Local Lock Check → Record Count Check → Distributed Lock Acquisition → Load All Records → Release Locks
(atomic flag) (count = 0) (via VectorDbLockManager) (batch load) (both locks)
Local Lock Check → Record Count Check → Single Record Upsert
(not active) (count > 0) (individual operation)
Distributed Lock Check ────┐
(lock detected) ↓
Set Temporary Local Lock (10s timeout) → Skip Operation
OR
Local Lock Check ──────────┐
(already active) ↓
Skip Operation
The implementation intelligently handles these flows to ensure collection completeness, prevent concurrent conflicts, and optimize system resources.
Operation | When Used | Lock Requirements | Behavior |
---|---|---|---|
Loading All Items (loadRecords ) |
• Collection is empty (count = 0) • System initialization • Full rebuild requested |
Both local and distributed locks required | • Blocks other operations • Loads records sequentially with controlled concurrency • Uses concatMap for guaranteed sequential processing |
Upserting or Load (upsertOrLoadRecords ) |
• Smart operation that checks collection state • Handles both empty and populated collections |
Conditional locking based on collection state | • Checks for distributed locks first • If collection empty, loads all records • If collection has data, performs single upsert |
Upserting One Item (internal upsertRecord ) |
• Collection exists and has data • Single record update required |
No locks required | • Fast, targeted update • Only affects specific vector record • Minimizes system impact |
Deleting Items (removeRecords ) |
• Record removal sequentially | No locks required | • Targeted deletion of specific records • Does not affect other records • Performs basic collection existence check |
Collection Operations (deleteCollection ) |
• Collection rebuild requested • Data purge operations |
Both local and distributed locks required | • deleteCollection : Completely removes the collection |
Namespace Operations (deleteNamespace ) |
• Namespace cleanup required • Multiple collection management |
Both local and distributed locks required | • Removes all collections within a namespace • Requires explicit confirmation flag for safety • Uses collection listing to find target collections |
AtomicBoolean isLockedForBatchProcessing
for fast local checkingVectorDbLockManager
using vectors in Qdrantif (!isLockedForBatchProcessing.compareAndSet(false, true))
lockManager.acquireLock(vectorStoreConfig, collectionName, "batch_processing")
TimedExecutor.enableWithDuration(isLockedForBatchProcessing::set, 10)
The Vector Database Service includes a comprehensive event tracking system using VectorDbEvent
to monitor operations:
Event Category | Events | Purpose |
---|---|---|
Load Records | LOAD_RECORDS__START , LOAD_RECORDS__LOAD_IN_PROGRESS , LOAD_RECORDS__LOCK_ACQUIRED , LOAD_RECORDS__LOCK_FAILED_TO_ACQUIRE , LOAD_RECORDS__START_EMBEDDING , LOAD_RECORDS__SUCCEEDED , LOAD_RECORDS__FAILED , LOAD_RECORDS__ENDED |
Track the lifecycle of batch loading operations |
Upsert or Load | UPSERT_OR_LOAD__START , UPSERT_OR_LOAD__LOCAL_LOAD_IN_PROGRESS , UPSERT_OR_LOAD__LOAD__SUBSCRIBED , UPSERT_OR_LOAD__UPSERT__SUBSCRIBED , UPSERT_OR_LOAD__LOAD__SUCCEEDED , UPSERT_OR_LOAD__LOAD__FAILED , UPSERT_OR_LOAD__LOAD__ENDED , UPSERT_OR_LOAD__UPSERT__SUCCEEDED , UPSERT_OR_LOAD__UPSERT__FAILED , UPSERT_OR_LOAD__UPSERT__ENDED |
Monitor the smart upsert operation with collection state detection |
Events are published to eventSubject
(a Reactor Sinks.Many
) for subscription by monitoring systems.
Initial State | Trigger Action | Vector DB State | Locking Mechanism | Expected Outcome |
---|---|---|---|---|
App node starting | System initialization | DB has data (count > 0) | None | loadRecordsIfEmpty performs count check and determines no loading needed |
App node starting | System initialization | DB is empty (count = 0) | Local flag + Distributed lock | loadRecordsIfEmpty triggers full loading; Local isLockedForBatchProcessing set to true; Distributed lock acquired; Records loaded sequentially; Locks released |
Running system | Record update via internal upsertRecord |
DB has data (count > 0) | None | Individual record updated without locking; Generates embedding and updates record |
Running system | Record update via upsertOrLoadRecords |
DB has data (count > 0) | None | System checks collection exists with data; Updates individual record without loading all |
Running system | Record update via upsertOrLoadRecords |
DB is empty (count = 0) | Local flag + Distributed lock | System detects empty collection; Sets local flag; Acquires distributed lock; Loads all records as batch; Locks released |
Running system | Record update via upsertOrLoadRecords |
Another node has distributed lock | Temporary local flag with timeout | System detects distributed lock; Sets local flag with 10-second timeout; Skips operation |
Running system | Multiple record adds via loadRecords |
DB has existing data | Local flag + Distributed lock | Tries to set local flag; Acquires distributed lock; Processes records sequentially with concatMap ; Releases locks when done |
Running system | Rebuild request | DB has existing data | Distributed lock | rebuildIndex deletes collection, then loadRecords |
Running system | Remove individual record | DB has existing data | None | removeRecord checks if collection exists; Removes single record by ID; No locks needed |
Running system | Remove multiple records | DB has existing data | None | removeRecords processes deletion of multiple record IDs; No locks needed |
Running system | Delete collection | DB has existing data | None | deleteCollection acquires distributed lock; removes entire collection; Releases lock |
Running system | Namespace deletion | Multiple collections | Distributed lock | deleteNamespace acquires distributed lock; Deletes all collections in namespace; Releases lock |
Node failure | Node crash during loading | Lock remains | Auto-expiration mechanism | Distributed lock has expiration timestamp and node ID information; Other nodes can take over after expiration |
concatMap
instead of flatMap
for guaranteed sequential processingsubscribeOn(Schedulers.boundedElastic())
to avoid blocking the main threadVectorDbLockManager
implements distributed locks using Qdrant collectionsTimedExecutor.enableWithDuration
qdrantClient.getRecordCount(params)
used to check if collection has datacollectionAndIsReady
cache in QdrantVectorDbClient
tracks collection readiness with timeouteventSubject
(Sinks.many().multicast().onBackpressureBuffer()
)VectorDbEvent.Type
) for consistent categorizationQdrantVectorDbClient
caches collection existence checks using StatusMap
with cache durationupsertOrLoadRecords
) handles both empty and populated collectionsVectorDbVerifier
close()
methodsStatusMap
utility provides efficient caching with automatic timeoutsVectorDbDiagnostics
doOnError
to report failures