This document outlines the design for implementing the AI Copilot functionality using Redis for event coordination while maintaining system resilience when Redis is temporarily unavailable.
batchEmbeddingInProgress
flag in Redis prevents duplicate workWhen a chart is updated, the system follows this workflow:
batchEmbeddingInProgress
flag is set in RedisredisWasDown
flag is set in RedisredisWasDown
is true:
batchEmbeddingInProgress
flag with a 5-minute timeoutbatchEmbeddingInProgress
and redisWasDown
to falseredisWasDown
is false:
This service handles the coordination of vector updates between nodes:
@Service
public class ChartVectorEventService {
private static final String REDIS_WAS_DOWN_KEY = "vector:redis:was_down";
private static final String BATCH_EMBEDDING_IN_PROGRESS_KEY = "vector:batch:in_progress";
private static final Duration BATCH_TIMEOUT = Duration.ofMinutes(5); // 5 minute timeout to prevent deadlocks
private final ReactiveRedisTemplate<String, ChartVectorEvent> redisTemplate;
private final VectorDbService vectorDbService;
private final ChartRepository chartRepository;
private final Disposable subscription;
// Initialize and subscribe to vector update events
public ChartVectorEventService(ReactiveRedisTemplate<String, ChartVectorEvent> redisTemplate,
VectorDbService vectorDbService,
ChartRepository chartRepository) {
this.redisTemplate = redisTemplate;
this.vectorDbService = vectorDbService;
this.chartRepository = chartRepository;
this.subscription = subscribeToVectorUpdates();
}
// Listen for vector updates from other nodes
private Disposable subscribeToVectorUpdates() {
return redisTemplate.listenToChannel("chart-vector-updates")
.onErrorContinue((error, obj) -> {
log.warn("Error receiving vector update, will continue", error);
})
.map(message -> message.getMessage())
.flatMap(event -> {
// Skip our own events
if (event.sourceNodeId.equals(getNodeId())) {
return Mono.empty();
}
// Add the vector to our local store
return Mono.fromCallable(() -> {
vectorDbService.addVectorRecord(event.chartId, event.vector);
return true;
});
})
.subscribe();
}
// Publish a vector update to other nodes
public Mono<Void> publishVectorUpdate(Integer chartId, float[] vector) {
// First check if batch process is running
return redisTemplate.hasKey(BATCH_EMBEDDING_IN_PROGRESS_KEY)
.flatMap(batchInProgress -> {
if (batchInProgress) {
log.debug("Batch embedding in progress, skipping individual update");
return Mono.empty();
}
// Then check if Redis was down and needs recovery
return redisTemplate.hasKey(REDIS_WAS_DOWN_KEY)
.flatMap(wasDown -> {
if (wasDown) {
log.info("Redis was down, initiating batch rebuild");
return startBatchRebuild();
} else {
// Normal operation - just publish this one update
ChartVectorEvent event = new ChartVectorEvent(chartId, vector, getNodeId());
return redisTemplate.convertAndSend("chart-vector-updates", event)
.doOnError(e -> {
// Next successful connection will set this flag
log.warn("Failed to publish vector update, Redis may be down", e);
})
.then();
}
})
.onErrorResume(e -> {
// If Redis check fails, set to true on next connection
log.warn("Failed to check Redis status, skipping publish", e);
return Mono.empty();
});
})
.onErrorResume(e -> {
// If any Redis operation fails, it will be retried on next chart update
log.warn("Redis appears to be down, skipping publish", e);
return Mono.empty();
});
}
// When Redis connection reestablished after outage
private Mono<Void> startBatchRebuild() {
// Set flag to prevent multiple nodes from rebuilding simultaneously
return redisTemplate.opsForValue().setIfAbsent(BATCH_EMBEDDING_IN_PROGRESS_KEY, getNodeId(), BATCH_TIMEOUT)
.flatMap(acquired -> {
if (!acquired) {
log.info("Another node is already handling the rebuild");
return Mono.empty();
}
log.info("Starting full vector rebuild");
// Fetch all charts and rebuild vectors
return chartRepository.getAllCharts()
.collectList()
.flatMap(charts -> {
return Flux.fromIterable(charts)
.flatMap(chart -> generateEmbedding(chart)
.flatMap(vector -> {
// Update local store
vectorDbService.addVectorRecord(chart.getId(), vector);
// Broadcast to other nodes
ChartVectorEvent event = new ChartVectorEvent(chart.getId(), vector, getNodeId());
return redisTemplate.convertAndSend("chart-vector-updates", event);
}))
.then();
})
.then(Mono.defer(() -> {
// Reset flags when complete
log.info("Vector rebuild complete");
return redisTemplate.delete(REDIS_WAS_DOWN_KEY)
.then(redisTemplate.delete(BATCH_EMBEDDING_IN_PROGRESS_KEY));
}));
});
}
// Helper method to detect Redis recovery on reconnection
public Mono<Boolean> checkAndMarkRedisRecovery() {
// Called from a Redis connection listener
return redisTemplate.opsForValue().set(REDIS_WAS_DOWN_KEY, "true");
}
}
This service handles the propagation of chart deletions to all nodes:
@Service
public class ChartDeletionService {
private static final String REDIS_WAS_DOWN_KEY = "vector:redis:was_down";
private final ReactiveRedisTemplate<String, ChartDeletionEvent> redisTemplate;
private final VectorDbService vectorDbService;
private final Disposable subscription;
// Subscribe to deletion events and process them
private Disposable subscribeToChartDeletions() {
return redisTemplate.listenToChannel("chart-deletions")
.onErrorContinue((error, obj) -> {
log.warn("Error processing chart deletion, will continue", error);
})
.map(message -> message.getMessage())
.flatMap(event -> {
// Skip our own events
if (event.sourceNodeId.equals(getNodeId())) {
return Mono.empty();
}
// Remove the chart from local vector store
return Mono.fromCallable(() -> {
vectorDbService.removeRecord(event.chartId);
return true;
});
})
.subscribe();
}
// Publish a chart deletion to all nodes
public Mono<Void> publishChartDeletion(Integer chartId) {
ChartDeletionEvent event = new ChartDeletionEvent(chartId, getNodeId());
return redisTemplate.convertAndSend("chart-deletions", event)
.doOnError(e -> {
log.warn("Failed to publish chart deletion, will be handled during next rebuild", e);
// No need to track this specifically - the next rebuild will clean it up
})
.then();
}
}
This service ensures all nodes eventually have complete vector data through periodic syncs:
@Service
public class VectorSyncService {
private final VectorDbService vectorDbService;
private final ChartRepository chartRepository;
private final ReactiveRedisTemplate<String, Object> redisTemplate;
private static final String BATCH_EMBEDDING_IN_PROGRESS_KEY = "vector:batch:in_progress";
private static final Duration BATCH_TIMEOUT = Duration.ofMinutes(5); // 5 minute timeout
// Scheduled full sync (hourly) - fallback mechanism
@Scheduled(fixedRate = 3600000)
public void performScheduledSync() {
// Only perform if not already in progress
redisTemplate.hasKey(BATCH_EMBEDDING_IN_PROGRESS_KEY)
.flatMap(inProgress -> {
if (inProgress) {
log.info("Batch embedding already in progress, skipping scheduled sync");
return Mono.empty();
}
log.info("Starting scheduled vector sync");
return initiateFullSync();
})
.subscribe(
result -> {},
error -> log.error("Error during scheduled sync check", error)
);
}
public Mono<Void> initiateFullSync() {
// Set flag to prevent multiple nodes from rebuilding simultaneously
return redisTemplate.opsForValue().setIfAbsent(BATCH_EMBEDDING_IN_PROGRESS_KEY, getNodeId(), BATCH_TIMEOUT)
.flatMap(acquired -> {
if (!acquired) {
log.info("Another node is already handling the sync");
return Mono.empty();
}
return chartRepository.getAllCharts()
.collectList()
.flatMap(charts -> vectorDbService.rebuildVectorStore(charts))
.doOnSuccess(count -> log.info("Vector sync completed, processed {} charts", count))
.doOnError(error -> log.error("Error during vector sync", error))
.then(redisTemplate.delete(BATCH_EMBEDDING_IN_PROGRESS_KEY));
});
}
}
onErrorResume
to prevent failuresredisWasDown
flag in Redis@Configuration
public class RedisConnectionListener {
private final ChartVectorEventService chartVectorEventService;
@EventListener(RedisConnectionFailureEvent.class)
public void onConnectionFailure(RedisConnectionFailureEvent event) {
log.warn("Redis connection failure detected");
}
@EventListener(RedisConnectedEvent.class)
public void onConnectionRestored(RedisConnectedEvent event) {
log.info("Redis connection restored, marking for vector rebuild");
// Mark that Redis was down, so next chart update will trigger rebuild
chartVectorEventService.checkAndMarkRedisRecovery()
.subscribe(
result -> log.info("Redis recovery flag set: {}", result),
error -> log.error("Failed to set Redis recovery flag", error)
);
}
}