Kafka излишнее создание сущности при retry
Делаю проект на микросервисной архитектуре и начал с функционала в виде регистрации пользователя и возникла дилема насчёт того, что при отправке user-created евента после успешного создания юзера, в методе sendEmailVerificationCode есть вызов на создание кода и последующей его отправки, но вот при попытке retry будет происходить повторный вызов создания кода и ненужное занятое место в бд. Как лучше поступить? Вынести вызов создания кода в auth service и уже после отправлять евент с кодом и почтой для его отправки или же реализовать как-то механизм для проверки уже существующего кода.
AuthService
public RegisterResponseDto register(SignUpRequestDto dto) throws JsonProcessingException {
String encodedPassword = passwordEncoder.encode(dto.password());
ProfileRequest profile = ProfileRequest.newBuilder()
.setLastName(dto.lastName())
.setFirstName(dto.firstName())
.setPhoneNumber(dto.phoneNumber())
.build();
CreateLocalUserRequest req = CreateLocalUserRequest.newBuilder()
.setEmail(dto.email())
.setPassword(encodedPassword)
.setProfile(profile)
.build();
try {
CreateUserResponse response = userServiceGrpc.createLocalUser(req);
UserCreatedEvent event = new UserCreatedEvent(response.getEmail(), response.getUserId());
kafkaTemplate.send("user-created", objectMapper.writeValueAsString(event));
return authMapper.toRegisterResponseDto(response);
} catch (StatusRuntimeException e) {
if (Objects.requireNonNull(e.getStatus().getCode()) == Status.Code.ALREADY_EXISTS) {
throw new EntityAlreadyExistsException("User with email " + dto.email() + " already exists");
}
throw e;
}
}
MailService
@RetryableTopic(attempts = "2", backoff = @Backoff(delay = 1000))
@KafkaListener(topics = "user-created", groupId = "mail-service")
public void sendEmailVerificationCode(String data, Acknowledgment acknowledgment) {
try {
log.info("Received data: {}", data);
UserCreatedEvent event = objectMapper.readValue(data, UserCreatedEvent.class);
CreateCodeRequest request = CreateCodeRequest.newBuilder()
.setScope(CodeScopeEnum.EMAIL_VERIFICATION)
.setUserId(event.userId())
.build();
CreateCodeResponse verificationCode = codeServiceGrpc.createCode(request);
SimpleMailMessage msg = new SimpleMailMessage();
msg.setFrom(from);
msg.setTo(event.email());
msg.setText("Your verification code is: " + verificationCode.getCode());
msg.setSubject("Email verification code");
javaMailSender.send(msg);
acknowledgment.acknowledge();
log.info("Email sent successfully to {}", event.email());
} catch (JacksonException | MailException e) {
log.error(e.getMessage());
}
}
CodeService
@Override
public void createCode(CreateCodeRequest request, StreamObserver<CreateCodeResponse> responseObserver) {
log.info("CreateCode method was called: {}, {}", request.getUserId(), request.getScope());
String generatedCode = codeGeneratorService.generateAlphanumericCode();
CodeScopeEnum scope = CodeScopeEnum.valueOf(request.getScope().name());
Code code = new Code(
generatedCode,
getExpirationTime(),
scope,
request.getUserId()
);
Code savedCode = codeRepository.save(code);
CreateCodeResponse response = CreateCodeResponse.newBuilder()
.setCode(savedCode.getCode())
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}