diff --git a/rag-service/pom.xml b/rag-service/pom.xml index aaee388..64aaf33 100644 --- a/rag-service/pom.xml +++ b/rag-service/pom.xml @@ -139,6 +139,10 @@ springdoc-openapi-starter-webmvc-ui 2.8.8 + + org.springframework.kafka + spring-kafka + diff --git a/rag-service/src/main/java/com/balex/rag/config/KafkaProducerConfig.java b/rag-service/src/main/java/com/balex/rag/config/KafkaProducerConfig.java new file mode 100644 index 0000000..92737e3 --- /dev/null +++ b/rag-service/src/main/java/com/balex/rag/config/KafkaProducerConfig.java @@ -0,0 +1,36 @@ +package com.balex.rag.config; + +import com.balex.rag.model.dto.UserEvent; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import java.util.Map; + +@Configuration +public class KafkaProducerConfig { + + @Value("${spring.kafka.bootstrap-servers:localhost:9092}") + private String bootstrapServers; + + @Bean + public ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(Map.of( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers, + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class + )); + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } +} + diff --git a/rag-service/src/main/java/com/balex/rag/controller/AuthController.java b/rag-service/src/main/java/com/balex/rag/controller/AuthController.java index 944e1b0..058a03f 100644 --- a/rag-service/src/main/java/com/balex/rag/controller/AuthController.java +++ b/rag-service/src/main/java/com/balex/rag/controller/AuthController.java @@ -6,6 +6,7 @@ import com.balex.rag.model.request.user.LoginRequest; import com.balex.rag.model.request.user.RegistrationUserRequest; import com.balex.rag.model.response.RagResponse; import com.balex.rag.service.AuthService; +import com.balex.rag.service.EventPublisher; import com.balex.rag.utils.ApiUtils; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.media.Content; @@ -28,6 +29,7 @@ import org.springframework.web.bind.annotation.*; @RequestMapping("${end.points.auth}") public class AuthController { private final AuthService authService; + private final EventPublisher eventPublisher; @ApiResponses(value = { @ApiResponse(responseCode = "200", description = "Successful authorization", @@ -76,8 +78,9 @@ public class AuthController { Cookie authorizationCookie = ApiUtils.createAuthCookie(result.getPayload().getToken()); response.addCookie(authorizationCookie); + eventPublisher.publishUserCreated(result.getPayload().getId().toString()); + return ResponseEntity.ok(result); } -} - +} \ No newline at end of file diff --git a/rag-service/src/main/java/com/balex/rag/controller/ChatController.java b/rag-service/src/main/java/com/balex/rag/controller/ChatController.java index 0cc6a57..18144bb 100644 --- a/rag-service/src/main/java/com/balex/rag/controller/ChatController.java +++ b/rag-service/src/main/java/com/balex/rag/controller/ChatController.java @@ -3,6 +3,7 @@ package com.balex.rag.controller; import com.balex.rag.model.constants.ApiLogMessage; import com.balex.rag.model.entity.Chat; import com.balex.rag.service.ChatService; +import com.balex.rag.service.EventPublisher; import com.balex.rag.utils.ApiUtils; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -20,6 +21,7 @@ import java.util.List; public class ChatController { private final ChatService chatService; + private final EventPublisher eventPublisher; @GetMapping("") public ResponseEntity> mainPage() { @@ -39,14 +41,24 @@ public class ChatController { public ResponseEntity newChat(@RequestParam String title) { log.trace(ApiLogMessage.NAME_OF_CURRENT_METHOD.getValue(), ApiUtils.getMethodName()); Chat chat = chatService.createNewChat(title); + + eventPublisher.publishChatCreated( + chat.getIdOwner().toString(), + chat.getId().toString()); + return ResponseEntity.ok(chat); } @DeleteMapping("/{chatId}") public ResponseEntity deleteChat(@PathVariable Long chatId) { log.trace(ApiLogMessage.NAME_OF_CURRENT_METHOD.getValue(), ApiUtils.getMethodName()); + Chat chat = chatService.getChat(chatId); chatService.deleteChat(chatId); + + eventPublisher.publishChatDeleted( + chat.getIdOwner().toString(), + chatId.toString()); + return ResponseEntity.noContent().build(); } -} - +} \ No newline at end of file diff --git a/rag-service/src/main/java/com/balex/rag/controller/ChatEntryController.java b/rag-service/src/main/java/com/balex/rag/controller/ChatEntryController.java index 562a0ba..de16b63 100644 --- a/rag-service/src/main/java/com/balex/rag/controller/ChatEntryController.java +++ b/rag-service/src/main/java/com/balex/rag/controller/ChatEntryController.java @@ -3,8 +3,11 @@ package com.balex.rag.controller; import com.balex.rag.config.RagDefaultsProperties; import com.balex.rag.model.constants.ApiLogMessage; import com.balex.rag.model.dto.UserEntryRequest; +import com.balex.rag.model.entity.Chat; import com.balex.rag.model.entity.ChatEntry; import com.balex.rag.service.ChatEntryService; +import com.balex.rag.service.ChatService; +import com.balex.rag.service.EventPublisher; import com.balex.rag.utils.ApiUtils; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -22,7 +25,9 @@ import java.util.List; public class ChatEntryController { private final ChatEntryService chatEntryService; + private final ChatService chatService; private final RagDefaultsProperties ragDefaults; + private final EventPublisher eventPublisher; @PostMapping("/{chatId}") public ResponseEntity addUserEntry( @@ -35,6 +40,13 @@ public class ChatEntryController { double topP = request.topP() != null ? request.topP() : ragDefaults.topP(); ChatEntry entry = chatEntryService.addUserEntry(chatId, request.content(), onlyContext, topK, topP); + + Chat chat = chatService.getChat(chatId); + eventPublisher.publishQuerySent( + chat.getIdOwner().toString(), + chatId.toString(), + 0); // TODO: add tokensUsed when Ollama response provides it + return ResponseEntity.ok(entry); } } \ No newline at end of file diff --git a/rag-service/src/main/java/com/balex/rag/controller/UserController.java b/rag-service/src/main/java/com/balex/rag/controller/UserController.java index 401c9f5..1dba786 100644 --- a/rag-service/src/main/java/com/balex/rag/controller/UserController.java +++ b/rag-service/src/main/java/com/balex/rag/controller/UserController.java @@ -8,6 +8,7 @@ import com.balex.rag.model.request.user.NewUserRequest; import com.balex.rag.model.request.user.UpdateUserRequest; import com.balex.rag.model.response.PaginationResponse; import com.balex.rag.model.response.RagResponse; +import com.balex.rag.service.EventPublisher; import com.balex.rag.service.UserService; import com.balex.rag.utils.ApiUtils; import jakarta.validation.Valid; @@ -26,6 +27,7 @@ import org.springframework.web.bind.annotation.*; @RequestMapping("${end.points.users}") public class UserController { private final UserService userService; + private final EventPublisher eventPublisher; @GetMapping("${end.points.id}") public ResponseEntity> getUserById( @@ -42,6 +44,9 @@ public class UserController { log.trace(ApiLogMessage.NAME_OF_CURRENT_METHOD.getValue(), ApiUtils.getMethodName()); RagResponse createdUser = userService.createUser(request); + + eventPublisher.publishUserCreated(createdUser.getPayload().getId().toString()); + return ResponseEntity.ok(createdUser); } @@ -96,4 +101,4 @@ public class UserController { RagResponse> response = userService.findAllUsers(pageable); return ResponseEntity.ok(response); } -} +} \ No newline at end of file diff --git a/rag-service/src/main/java/com/balex/rag/model/dto/UserEvent.java b/rag-service/src/main/java/com/balex/rag/model/dto/UserEvent.java new file mode 100644 index 0000000..e8c111b --- /dev/null +++ b/rag-service/src/main/java/com/balex/rag/model/dto/UserEvent.java @@ -0,0 +1,39 @@ +package com.balex.rag.model.dto; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.Instant; + +/** + * Event published to Kafka topic "user-events". + * Consumed by analytics-service. + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@JsonIgnoreProperties(ignoreUnknown = true) +public class UserEvent { + + private EventType type; + private String userId; + private String chatId; + private Integer tokensUsed; + private Integer documentsFound; + + @Builder.Default + private Instant timestamp = Instant.now(); + + public enum EventType { + USER_CREATED, + CHAT_CREATED, + CHAT_DELETED, + QUERY_SENT, + RAG_CONTEXT_HIT + } +} + diff --git a/rag-service/src/main/java/com/balex/rag/service/EventPublisher.java b/rag-service/src/main/java/com/balex/rag/service/EventPublisher.java new file mode 100644 index 0000000..a67d9f0 --- /dev/null +++ b/rag-service/src/main/java/com/balex/rag/service/EventPublisher.java @@ -0,0 +1,87 @@ +package com.balex.rag.service; + +import com.balex.rag.model.dto.UserEvent; +import com.balex.rag.model.dto.UserEvent.EventType; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Service; + +/** + * Publishes user events to Kafka for analytics-service consumption. + * + * Usage examples in your existing services/controllers: + * + * // User registration + * eventPublisher.publishUserCreated(user.getId()); + * + * // Chat created + * eventPublisher.publishChatCreated(userId, chatId); + * + * // Query sent to RAG + * eventPublisher.publishQuerySent(userId, chatId, tokensUsed); + * + * // RAG context found documents + * eventPublisher.publishRagContextHit(userId, documentsFound); + */ +@Slf4j +@Service +@RequiredArgsConstructor +public class EventPublisher { + + private final KafkaTemplate kafkaTemplate; + + @Value("${analytics.kafka.topic:user-events}") + private String topic; + + public void publishUserCreated(String userId) { + publish(UserEvent.builder() + .type(EventType.USER_CREATED) + .userId(userId) + .build()); + } + + public void publishChatCreated(String userId, String chatId) { + publish(UserEvent.builder() + .type(EventType.CHAT_CREATED) + .userId(userId) + .chatId(chatId) + .build()); + } + + public void publishChatDeleted(String userId, String chatId) { + publish(UserEvent.builder() + .type(EventType.CHAT_DELETED) + .userId(userId) + .chatId(chatId) + .build()); + } + + public void publishQuerySent(String userId, String chatId, int tokensUsed) { + publish(UserEvent.builder() + .type(EventType.QUERY_SENT) + .userId(userId) + .chatId(chatId) + .tokensUsed(tokensUsed) + .build()); + } + + public void publishRagContextHit(String userId, int documentsFound) { + publish(UserEvent.builder() + .type(EventType.RAG_CONTEXT_HIT) + .userId(userId) + .documentsFound(documentsFound) + .build()); + } + + private void publish(UserEvent event) { + try { + kafkaTemplate.send(topic, event.getUserId(), event); + log.debug("Published event: type={}, userId={}", event.getType(), event.getUserId()); + } catch (Exception e) { + log.error("Failed to publish event: {}", event, e); + } + } +} + diff --git a/rag-service/src/main/resources/application.properties b/rag-service/src/main/resources/application.properties index 7bebe9e..504c04e 100644 --- a/rag-service/src/main/resources/application.properties +++ b/rag-service/src/main/resources/application.properties @@ -35,4 +35,7 @@ rag.rerank-fetch-multiplier = 2 #Swagger swagger.servers.first=http://localhost:8080 springdoc.swagger-ui.path=/swagger-ui.html -springdoc.api-docs.path=/v3/api-docs \ No newline at end of file +springdoc.api-docs.path=/v3/api-docs +#Kafka +spring.kafka.bootstrap-servers=${KAFKA_BOOTSTRAP_SERVERS:localhost:9092} +analytics.kafka.topic=user-events \ No newline at end of file