kafka
This commit is contained in:
@@ -0,0 +1,36 @@
|
||||
package com.balex.rag.config;
|
||||
|
||||
import com.balex.rag.model.dto.UserEvent;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
import org.springframework.kafka.support.serializer.JsonSerializer;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@Configuration
|
||||
public class KafkaProducerConfig {
|
||||
|
||||
@Value("${spring.kafka.bootstrap-servers:localhost:9092}")
|
||||
private String bootstrapServers;
|
||||
|
||||
@Bean
|
||||
public ProducerFactory<String, UserEvent> producerFactory() {
|
||||
return new DefaultKafkaProducerFactory<>(Map.of(
|
||||
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
|
||||
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
|
||||
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class
|
||||
));
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KafkaTemplate<String, UserEvent> kafkaTemplate() {
|
||||
return new KafkaTemplate<>(producerFactory());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import com.balex.rag.model.request.user.LoginRequest;
|
||||
import com.balex.rag.model.request.user.RegistrationUserRequest;
|
||||
import com.balex.rag.model.response.RagResponse;
|
||||
import com.balex.rag.service.AuthService;
|
||||
import com.balex.rag.service.EventPublisher;
|
||||
import com.balex.rag.utils.ApiUtils;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.media.Content;
|
||||
@@ -28,6 +29,7 @@ import org.springframework.web.bind.annotation.*;
|
||||
@RequestMapping("${end.points.auth}")
|
||||
public class AuthController {
|
||||
private final AuthService authService;
|
||||
private final EventPublisher eventPublisher;
|
||||
|
||||
@ApiResponses(value = {
|
||||
@ApiResponse(responseCode = "200", description = "Successful authorization",
|
||||
@@ -76,8 +78,9 @@ public class AuthController {
|
||||
Cookie authorizationCookie = ApiUtils.createAuthCookie(result.getPayload().getToken());
|
||||
response.addCookie(authorizationCookie);
|
||||
|
||||
eventPublisher.publishUserCreated(result.getPayload().getId().toString());
|
||||
|
||||
return ResponseEntity.ok(result);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -3,6 +3,7 @@ package com.balex.rag.controller;
|
||||
import com.balex.rag.model.constants.ApiLogMessage;
|
||||
import com.balex.rag.model.entity.Chat;
|
||||
import com.balex.rag.service.ChatService;
|
||||
import com.balex.rag.service.EventPublisher;
|
||||
import com.balex.rag.utils.ApiUtils;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -20,6 +21,7 @@ import java.util.List;
|
||||
public class ChatController {
|
||||
|
||||
private final ChatService chatService;
|
||||
private final EventPublisher eventPublisher;
|
||||
|
||||
@GetMapping("")
|
||||
public ResponseEntity<List<Chat>> mainPage() {
|
||||
@@ -39,14 +41,24 @@ public class ChatController {
|
||||
public ResponseEntity<Chat> newChat(@RequestParam String title) {
|
||||
log.trace(ApiLogMessage.NAME_OF_CURRENT_METHOD.getValue(), ApiUtils.getMethodName());
|
||||
Chat chat = chatService.createNewChat(title);
|
||||
|
||||
eventPublisher.publishChatCreated(
|
||||
chat.getIdOwner().toString(),
|
||||
chat.getId().toString());
|
||||
|
||||
return ResponseEntity.ok(chat);
|
||||
}
|
||||
|
||||
@DeleteMapping("/{chatId}")
|
||||
public ResponseEntity<Void> deleteChat(@PathVariable Long chatId) {
|
||||
log.trace(ApiLogMessage.NAME_OF_CURRENT_METHOD.getValue(), ApiUtils.getMethodName());
|
||||
Chat chat = chatService.getChat(chatId);
|
||||
chatService.deleteChat(chatId);
|
||||
|
||||
eventPublisher.publishChatDeleted(
|
||||
chat.getIdOwner().toString(),
|
||||
chatId.toString());
|
||||
|
||||
return ResponseEntity.noContent().build();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -3,8 +3,11 @@ package com.balex.rag.controller;
|
||||
import com.balex.rag.config.RagDefaultsProperties;
|
||||
import com.balex.rag.model.constants.ApiLogMessage;
|
||||
import com.balex.rag.model.dto.UserEntryRequest;
|
||||
import com.balex.rag.model.entity.Chat;
|
||||
import com.balex.rag.model.entity.ChatEntry;
|
||||
import com.balex.rag.service.ChatEntryService;
|
||||
import com.balex.rag.service.ChatService;
|
||||
import com.balex.rag.service.EventPublisher;
|
||||
import com.balex.rag.utils.ApiUtils;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -22,7 +25,9 @@ import java.util.List;
|
||||
public class ChatEntryController {
|
||||
|
||||
private final ChatEntryService chatEntryService;
|
||||
private final ChatService chatService;
|
||||
private final RagDefaultsProperties ragDefaults;
|
||||
private final EventPublisher eventPublisher;
|
||||
|
||||
@PostMapping("/{chatId}")
|
||||
public ResponseEntity<ChatEntry> addUserEntry(
|
||||
@@ -35,6 +40,13 @@ public class ChatEntryController {
|
||||
double topP = request.topP() != null ? request.topP() : ragDefaults.topP();
|
||||
|
||||
ChatEntry entry = chatEntryService.addUserEntry(chatId, request.content(), onlyContext, topK, topP);
|
||||
|
||||
Chat chat = chatService.getChat(chatId);
|
||||
eventPublisher.publishQuerySent(
|
||||
chat.getIdOwner().toString(),
|
||||
chatId.toString(),
|
||||
0); // TODO: add tokensUsed when Ollama response provides it
|
||||
|
||||
return ResponseEntity.ok(entry);
|
||||
}
|
||||
}
|
||||
@@ -8,6 +8,7 @@ import com.balex.rag.model.request.user.NewUserRequest;
|
||||
import com.balex.rag.model.request.user.UpdateUserRequest;
|
||||
import com.balex.rag.model.response.PaginationResponse;
|
||||
import com.balex.rag.model.response.RagResponse;
|
||||
import com.balex.rag.service.EventPublisher;
|
||||
import com.balex.rag.service.UserService;
|
||||
import com.balex.rag.utils.ApiUtils;
|
||||
import jakarta.validation.Valid;
|
||||
@@ -26,6 +27,7 @@ import org.springframework.web.bind.annotation.*;
|
||||
@RequestMapping("${end.points.users}")
|
||||
public class UserController {
|
||||
private final UserService userService;
|
||||
private final EventPublisher eventPublisher;
|
||||
|
||||
@GetMapping("${end.points.id}")
|
||||
public ResponseEntity<RagResponse<UserDTO>> getUserById(
|
||||
@@ -42,6 +44,9 @@ public class UserController {
|
||||
log.trace(ApiLogMessage.NAME_OF_CURRENT_METHOD.getValue(), ApiUtils.getMethodName());
|
||||
|
||||
RagResponse<UserDTO> createdUser = userService.createUser(request);
|
||||
|
||||
eventPublisher.publishUserCreated(createdUser.getPayload().getId().toString());
|
||||
|
||||
return ResponseEntity.ok(createdUser);
|
||||
}
|
||||
|
||||
@@ -96,4 +101,4 @@ public class UserController {
|
||||
RagResponse<PaginationResponse<UserSearchDTO>> response = userService.findAllUsers(pageable);
|
||||
return ResponseEntity.ok(response);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,39 @@
|
||||
package com.balex.rag.model.dto;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.Instant;
|
||||
|
||||
/**
|
||||
* Event published to Kafka topic "user-events".
|
||||
* Consumed by analytics-service.
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||
public class UserEvent {
|
||||
|
||||
private EventType type;
|
||||
private String userId;
|
||||
private String chatId;
|
||||
private Integer tokensUsed;
|
||||
private Integer documentsFound;
|
||||
|
||||
@Builder.Default
|
||||
private Instant timestamp = Instant.now();
|
||||
|
||||
public enum EventType {
|
||||
USER_CREATED,
|
||||
CHAT_CREATED,
|
||||
CHAT_DELETED,
|
||||
QUERY_SENT,
|
||||
RAG_CONTEXT_HIT
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,87 @@
|
||||
package com.balex.rag.service;
|
||||
|
||||
import com.balex.rag.model.dto.UserEvent;
|
||||
import com.balex.rag.model.dto.UserEvent.EventType;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* Publishes user events to Kafka for analytics-service consumption.
|
||||
*
|
||||
* Usage examples in your existing services/controllers:
|
||||
*
|
||||
* // User registration
|
||||
* eventPublisher.publishUserCreated(user.getId());
|
||||
*
|
||||
* // Chat created
|
||||
* eventPublisher.publishChatCreated(userId, chatId);
|
||||
*
|
||||
* // Query sent to RAG
|
||||
* eventPublisher.publishQuerySent(userId, chatId, tokensUsed);
|
||||
*
|
||||
* // RAG context found documents
|
||||
* eventPublisher.publishRagContextHit(userId, documentsFound);
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class EventPublisher {
|
||||
|
||||
private final KafkaTemplate<String, UserEvent> kafkaTemplate;
|
||||
|
||||
@Value("${analytics.kafka.topic:user-events}")
|
||||
private String topic;
|
||||
|
||||
public void publishUserCreated(String userId) {
|
||||
publish(UserEvent.builder()
|
||||
.type(EventType.USER_CREATED)
|
||||
.userId(userId)
|
||||
.build());
|
||||
}
|
||||
|
||||
public void publishChatCreated(String userId, String chatId) {
|
||||
publish(UserEvent.builder()
|
||||
.type(EventType.CHAT_CREATED)
|
||||
.userId(userId)
|
||||
.chatId(chatId)
|
||||
.build());
|
||||
}
|
||||
|
||||
public void publishChatDeleted(String userId, String chatId) {
|
||||
publish(UserEvent.builder()
|
||||
.type(EventType.CHAT_DELETED)
|
||||
.userId(userId)
|
||||
.chatId(chatId)
|
||||
.build());
|
||||
}
|
||||
|
||||
public void publishQuerySent(String userId, String chatId, int tokensUsed) {
|
||||
publish(UserEvent.builder()
|
||||
.type(EventType.QUERY_SENT)
|
||||
.userId(userId)
|
||||
.chatId(chatId)
|
||||
.tokensUsed(tokensUsed)
|
||||
.build());
|
||||
}
|
||||
|
||||
public void publishRagContextHit(String userId, int documentsFound) {
|
||||
publish(UserEvent.builder()
|
||||
.type(EventType.RAG_CONTEXT_HIT)
|
||||
.userId(userId)
|
||||
.documentsFound(documentsFound)
|
||||
.build());
|
||||
}
|
||||
|
||||
private void publish(UserEvent event) {
|
||||
try {
|
||||
kafkaTemplate.send(topic, event.getUserId(), event);
|
||||
log.debug("Published event: type={}, userId={}", event.getType(), event.getUserId());
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to publish event: {}", event, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user