kafka rag

This commit is contained in:
2026-02-22 11:52:20 +01:00
parent 34ed264414
commit 28d59afdbb
5 changed files with 106 additions and 84 deletions

View File

@@ -33,4 +33,3 @@ public class KafkaProducerConfig {
return new KafkaTemplate<>(producerFactory()); return new KafkaTemplate<>(producerFactory());
} }
} }

View File

@@ -15,8 +15,6 @@ import org.springframework.http.ResponseEntity;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import java.util.List;
@Slf4j @Slf4j
@RestController @RestController
@Validated @Validated

View File

@@ -0,0 +1,31 @@
package com.balex.rag.model.kafka;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.Instant;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class UserEvent {
private EventType type;
private String userId;
private String chatId;
private Integer tokensUsed;
private Integer documentsFound;
@Builder.Default
private Instant timestamp = Instant.now();
public enum EventType {
USER_CREATED,
CHAT_CREATED,
CHAT_DELETED,
QUERY_SENT
}
}

View File

@@ -1,87 +1,12 @@
package com.balex.rag.service; package com.balex.rag.service;
import com.balex.rag.model.dto.UserEvent; public interface EventPublisher {
import com.balex.rag.model.dto.UserEvent.EventType;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
/** void publishChatCreated(String userId, String chatId);
* Publishes user events to Kafka for analytics-service consumption.
*
* Usage examples in your existing services/controllers:
*
* // User registration
* eventPublisher.publishUserCreated(user.getId());
*
* // Chat created
* eventPublisher.publishChatCreated(userId, chatId);
*
* // Query sent to RAG
* eventPublisher.publishQuerySent(userId, chatId, tokensUsed);
*
* // RAG context found documents
* eventPublisher.publishRagContextHit(userId, documentsFound);
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class EventPublisher {
private final KafkaTemplate<String, UserEvent> kafkaTemplate; void publishChatDeleted(String userId, String chatId);
@Value("${analytics.kafka.topic:user-events}") void publishQuerySent(String userId, String chatId, int tokensUsed);
private String topic;
public void publishUserCreated(String userId) {
publish(UserEvent.builder()
.type(EventType.USER_CREATED)
.userId(userId)
.build());
}
public void publishChatCreated(String userId, String chatId) {
publish(UserEvent.builder()
.type(EventType.CHAT_CREATED)
.userId(userId)
.chatId(chatId)
.build());
}
public void publishChatDeleted(String userId, String chatId) {
publish(UserEvent.builder()
.type(EventType.CHAT_DELETED)
.userId(userId)
.chatId(chatId)
.build());
}
public void publishQuerySent(String userId, String chatId, int tokensUsed) {
publish(UserEvent.builder()
.type(EventType.QUERY_SENT)
.userId(userId)
.chatId(chatId)
.tokensUsed(tokensUsed)
.build());
}
public void publishRagContextHit(String userId, int documentsFound) {
publish(UserEvent.builder()
.type(EventType.RAG_CONTEXT_HIT)
.userId(userId)
.documentsFound(documentsFound)
.build());
}
private void publish(UserEvent event) {
try {
kafkaTemplate.send(topic, event.getUserId(), event);
log.debug("Published event: type={}, userId={}", event.getType(), event.getUserId());
} catch (Exception e) {
log.error("Failed to publish event: {}", event, e);
}
}
}
void publishUserCreated(String userId);
}

View File

@@ -0,0 +1,69 @@
package com.balex.rag.service.impl;
import com.balex.rag.model.kafka.UserEvent;
import com.balex.rag.model.kafka.UserEvent.EventType;
import com.balex.rag.service.EventPublisher;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class EventPublisherImpl implements EventPublisher {
private final KafkaTemplate<String, UserEvent> kafkaTemplate;
@Value("${analytics.kafka.topic:user-events}")
private String topic;
@Override
public void publishChatCreated(String userId, String chatId) {
publish(UserEvent.builder()
.type(EventType.CHAT_CREATED)
.userId(userId)
.chatId(chatId)
.build());
}
@Override
public void publishChatDeleted(String userId, String chatId) {
publish(UserEvent.builder()
.type(EventType.CHAT_DELETED)
.userId(userId)
.chatId(chatId)
.build());
}
@Override
public void publishQuerySent(String userId, String chatId, int tokensUsed) {
publish(UserEvent.builder()
.type(EventType.QUERY_SENT)
.userId(userId)
.chatId(chatId)
.tokensUsed(tokensUsed)
.build());
}
private void publish(UserEvent event) {
kafkaTemplate.send(topic, event.getUserId(), event)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Failed to send event {}: {}", event.getType(), ex.getMessage());
} else {
log.info("Event sent: type={}, userId={}", event.getType(), event.getUserId());
}
});
}
@Override
public void publishUserCreated(String userId) {
publish(UserEvent.builder()
.type(EventType.USER_CREATED)
.userId(userId)
.build());
}
}