In vector database operations, we require the collection to be complete with all records loaded before performing individual upsert operations. Without completeness guarantees:
| Lock Type | Implementation | Scope | Primary Purpose | Advantage |
|---|---|---|---|---|
| Local Lock | In-memory atomic flag (isLockedForBatchProcessing) |
Single application node | Prevents redundant operations on the same node | Low overhead, no network communication required |
| Distributed Lock | Managed by VectorDbLockManager using Qdrant |
Across all application nodes | Coordinates operations between multiple application instances | Ensures only one node performs global operations, prevents duplicate initialization |
Local Lock Check → Record Count Check → Distributed Lock Acquisition → Load All Records → Release Locks
(atomic flag) (count = 0) (via VectorDbLockManager) (batch load) (both locks)
Local Lock Check → Record Count Check → Single Record Upsert
(not active) (count > 0) (individual operation)
Distributed Lock Check ────┐
(lock detected) ↓
Set Temporary Local Lock (10s timeout) → Skip Operation
OR
Local Lock Check ──────────┐
(already active) ↓
Skip Operation
The implementation intelligently handles these flows to ensure collection completeness, prevent concurrent conflicts, and optimize system resources.
| Operation | When Used | Lock Requirements | Behavior |
|---|---|---|---|
Loading All Items (loadRecords) |
• Collection is empty (count = 0) • System initialization • Full rebuild requested |
Both local and distributed locks required | • Blocks other operations • Loads records sequentially with controlled concurrency • Uses concatMap for guaranteed sequential processing |
Upserting or Load (upsertOrLoadRecords) |
• Smart operation that checks collection state • Handles both empty and populated collections |
Conditional locking based on collection state | • Checks for distributed locks first • If collection empty, loads all records • If collection has data, performs single upsert |
Upserting One Item (internal upsertRecord) |
• Collection exists and has data • Single record update required |
No locks required | • Fast, targeted update • Only affects specific vector record • Minimizes system impact |
Deleting Items (removeRecords) |
• Record removal sequentially | No locks required | • Targeted deletion of specific records • Does not affect other records • Performs basic collection existence check |
Collection Operations (deleteCollection) |
• Collection rebuild requested • Data purge operations |
Both local and distributed locks required | • deleteCollection: Completely removes the collection |
Namespace Operations (deleteNamespace) |
• Namespace cleanup required • Multiple collection management |
Both local and distributed locks required | • Removes all collections within a namespace • Requires explicit confirmation flag for safety • Uses collection listing to find target collections |
AtomicBoolean isLockedForBatchProcessing for fast local checkingVectorDbLockManager using vectors in Qdrantif (!isLockedForBatchProcessing.compareAndSet(false, true))lockManager.acquireLock(vectorStoreConfig, collectionName, "batch_processing")TimedExecutor.enableWithDuration(isLockedForBatchProcessing::set, 10)The Vector Database Service includes a comprehensive event tracking system using VectorDbEvent to monitor operations:
| Event Category | Events | Purpose |
|---|---|---|
| Load Records | LOAD_RECORDS__START, LOAD_RECORDS__LOAD_IN_PROGRESS, LOAD_RECORDS__LOCK_ACQUIRED, LOAD_RECORDS__LOCK_FAILED_TO_ACQUIRE, LOAD_RECORDS__START_EMBEDDING, LOAD_RECORDS__SUCCEEDED, LOAD_RECORDS__FAILED, LOAD_RECORDS__ENDED |
Track the lifecycle of batch loading operations |
| Upsert or Load | UPSERT_OR_LOAD__START, UPSERT_OR_LOAD__LOCAL_LOAD_IN_PROGRESS, UPSERT_OR_LOAD__LOAD__SUBSCRIBED, UPSERT_OR_LOAD__UPSERT__SUBSCRIBED, UPSERT_OR_LOAD__LOAD__SUCCEEDED, UPSERT_OR_LOAD__LOAD__FAILED, UPSERT_OR_LOAD__LOAD__ENDED, UPSERT_OR_LOAD__UPSERT__SUCCEEDED, UPSERT_OR_LOAD__UPSERT__FAILED, UPSERT_OR_LOAD__UPSERT__ENDED |
Monitor the smart upsert operation with collection state detection |
Events are published to eventSubject (a Reactor Sinks.Many) for subscription by monitoring systems.
| Initial State | Trigger Action | Vector DB State | Locking Mechanism | Expected Outcome |
|---|---|---|---|---|
| App node starting | System initialization | DB has data (count > 0) | None | loadRecordsIfEmpty performs count check and determines no loading needed |
| App node starting | System initialization | DB is empty (count = 0) | Local flag + Distributed lock | loadRecordsIfEmpty triggers full loading; Local isLockedForBatchProcessing set to true; Distributed lock acquired; Records loaded sequentially; Locks released |
| Running system | Record update via internal upsertRecord |
DB has data (count > 0) | None | Individual record updated without locking; Generates embedding and updates record |
| Running system | Record update via upsertOrLoadRecords |
DB has data (count > 0) | None | System checks collection exists with data; Updates individual record without loading all |
| Running system | Record update via upsertOrLoadRecords |
DB is empty (count = 0) | Local flag + Distributed lock | System detects empty collection; Sets local flag; Acquires distributed lock; Loads all records as batch; Locks released |
| Running system | Record update via upsertOrLoadRecords |
Another node has distributed lock | Temporary local flag with timeout | System detects distributed lock; Sets local flag with 10-second timeout; Skips operation |
| Running system | Multiple record adds via loadRecords |
DB has existing data | Local flag + Distributed lock | Tries to set local flag; Acquires distributed lock; Processes records sequentially with concatMap; Releases locks when done |
| Running system | Rebuild request | DB has existing data | Distributed lock | rebuildIndex deletes collection, then loadRecords |
| Running system | Remove individual record | DB has existing data | None | removeRecord checks if collection exists; Removes single record by ID; No locks needed |
| Running system | Remove multiple records | DB has existing data | None | removeRecords processes deletion of multiple record IDs; No locks needed |
| Running system | Delete collection | DB has existing data | None | deleteCollection acquires distributed lock; removes entire collection; Releases lock |
| Running system | Namespace deletion | Multiple collections | Distributed lock | deleteNamespace acquires distributed lock; Deletes all collections in namespace; Releases lock |
| Node failure | Node crash during loading | Lock remains | Auto-expiration mechanism | Distributed lock has expiration timestamp and node ID information; Other nodes can take over after expiration |
concatMap instead of flatMap for guaranteed sequential processingsubscribeOn(Schedulers.boundedElastic()) to avoid blocking the main threadVectorDbLockManager implements distributed locks using Qdrant collectionsTimedExecutor.enableWithDurationqdrantClient.getRecordCount(params) used to check if collection has datacollectionAndIsReady cache in QdrantVectorDbClient tracks collection readiness with timeouteventSubject (Sinks.many().multicast().onBackpressureBuffer())VectorDbEvent.Type) for consistent categorizationQdrantVectorDbClient caches collection existence checks using StatusMap with cache durationupsertOrLoadRecords) handles both empty and populated collectionsVectorDbVerifierclose() methodsStatusMap utility provides efficient caching with automatic timeoutsVectorDbDiagnosticsdoOnError to report failures