diff --git a/rag-service/src/main/java/com/balex/rag/config/KafkaProducerConfig.java b/rag-service/src/main/java/com/balex/rag/config/KafkaProducerConfig.java index 92737e3..5277c94 100644 --- a/rag-service/src/main/java/com/balex/rag/config/KafkaProducerConfig.java +++ b/rag-service/src/main/java/com/balex/rag/config/KafkaProducerConfig.java @@ -33,4 +33,3 @@ public class KafkaProducerConfig { return new KafkaTemplate<>(producerFactory()); } } - diff --git a/rag-service/src/main/java/com/balex/rag/controller/ChatEntryController.java b/rag-service/src/main/java/com/balex/rag/controller/ChatEntryController.java index 257eb24..c43b1ba 100644 --- a/rag-service/src/main/java/com/balex/rag/controller/ChatEntryController.java +++ b/rag-service/src/main/java/com/balex/rag/controller/ChatEntryController.java @@ -15,8 +15,6 @@ import org.springframework.http.ResponseEntity; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.*; -import java.util.List; - @Slf4j @RestController @Validated diff --git a/rag-service/src/main/java/com/balex/rag/model/kafka/UserEvent.java b/rag-service/src/main/java/com/balex/rag/model/kafka/UserEvent.java new file mode 100644 index 0000000..2fcad47 --- /dev/null +++ b/rag-service/src/main/java/com/balex/rag/model/kafka/UserEvent.java @@ -0,0 +1,31 @@ +package com.balex.rag.model.kafka; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.Instant; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class UserEvent { + + private EventType type; + private String userId; + private String chatId; + private Integer tokensUsed; + private Integer documentsFound; + + @Builder.Default + private Instant timestamp = Instant.now(); + + public enum EventType { + USER_CREATED, + CHAT_CREATED, + CHAT_DELETED, + QUERY_SENT + } +} \ No newline at end of file diff --git a/rag-service/src/main/java/com/balex/rag/service/EventPublisher.java b/rag-service/src/main/java/com/balex/rag/service/EventPublisher.java index a67d9f0..dd90ac8 100644 --- a/rag-service/src/main/java/com/balex/rag/service/EventPublisher.java +++ b/rag-service/src/main/java/com/balex/rag/service/EventPublisher.java @@ -1,87 +1,12 @@ package com.balex.rag.service; -import com.balex.rag.model.dto.UserEvent; -import com.balex.rag.model.dto.UserEvent.EventType; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.stereotype.Service; +public interface EventPublisher { -/** - * Publishes user events to Kafka for analytics-service consumption. - * - * Usage examples in your existing services/controllers: - * - * // User registration - * eventPublisher.publishUserCreated(user.getId()); - * - * // Chat created - * eventPublisher.publishChatCreated(userId, chatId); - * - * // Query sent to RAG - * eventPublisher.publishQuerySent(userId, chatId, tokensUsed); - * - * // RAG context found documents - * eventPublisher.publishRagContextHit(userId, documentsFound); - */ -@Slf4j -@Service -@RequiredArgsConstructor -public class EventPublisher { + void publishChatCreated(String userId, String chatId); - private final KafkaTemplate kafkaTemplate; + void publishChatDeleted(String userId, String chatId); - @Value("${analytics.kafka.topic:user-events}") - private String topic; - - public void publishUserCreated(String userId) { - publish(UserEvent.builder() - .type(EventType.USER_CREATED) - .userId(userId) - .build()); - } - - public void publishChatCreated(String userId, String chatId) { - publish(UserEvent.builder() - .type(EventType.CHAT_CREATED) - .userId(userId) - .chatId(chatId) - .build()); - } - - public void publishChatDeleted(String userId, String chatId) { - publish(UserEvent.builder() - .type(EventType.CHAT_DELETED) - .userId(userId) - .chatId(chatId) - .build()); - } - - public void publishQuerySent(String userId, String chatId, int tokensUsed) { - publish(UserEvent.builder() - .type(EventType.QUERY_SENT) - .userId(userId) - .chatId(chatId) - .tokensUsed(tokensUsed) - .build()); - } - - public void publishRagContextHit(String userId, int documentsFound) { - publish(UserEvent.builder() - .type(EventType.RAG_CONTEXT_HIT) - .userId(userId) - .documentsFound(documentsFound) - .build()); - } - - private void publish(UserEvent event) { - try { - kafkaTemplate.send(topic, event.getUserId(), event); - log.debug("Published event: type={}, userId={}", event.getType(), event.getUserId()); - } catch (Exception e) { - log.error("Failed to publish event: {}", event, e); - } - } -} + void publishQuerySent(String userId, String chatId, int tokensUsed); + void publishUserCreated(String userId); +} \ No newline at end of file diff --git a/rag-service/src/main/java/com/balex/rag/service/impl/EventPublisherImpl.java b/rag-service/src/main/java/com/balex/rag/service/impl/EventPublisherImpl.java new file mode 100644 index 0000000..66544d2 --- /dev/null +++ b/rag-service/src/main/java/com/balex/rag/service/impl/EventPublisherImpl.java @@ -0,0 +1,69 @@ +package com.balex.rag.service.impl; + +import com.balex.rag.model.kafka.UserEvent; +import com.balex.rag.model.kafka.UserEvent.EventType; +import com.balex.rag.service.EventPublisher; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +@RequiredArgsConstructor +public class EventPublisherImpl implements EventPublisher { + + private final KafkaTemplate kafkaTemplate; + + @Value("${analytics.kafka.topic:user-events}") + private String topic; + + @Override + public void publishChatCreated(String userId, String chatId) { + publish(UserEvent.builder() + .type(EventType.CHAT_CREATED) + .userId(userId) + .chatId(chatId) + .build()); + } + + @Override + public void publishChatDeleted(String userId, String chatId) { + publish(UserEvent.builder() + .type(EventType.CHAT_DELETED) + .userId(userId) + .chatId(chatId) + .build()); + } + + @Override + public void publishQuerySent(String userId, String chatId, int tokensUsed) { + publish(UserEvent.builder() + .type(EventType.QUERY_SENT) + .userId(userId) + .chatId(chatId) + .tokensUsed(tokensUsed) + .build()); + } + + + private void publish(UserEvent event) { + kafkaTemplate.send(topic, event.getUserId(), event) + .whenComplete((result, ex) -> { + if (ex != null) { + log.error("Failed to send event {}: {}", event.getType(), ex.getMessage()); + } else { + log.info("Event sent: type={}, userId={}", event.getType(), event.getUserId()); + } + }); + } + + @Override + public void publishUserCreated(String userId) { + publish(UserEvent.builder() + .type(EventType.USER_CREATED) + .userId(userId) + .build()); + } +} \ No newline at end of file