FitTrip 프로젝트는 MSA 구조에서 EDA를 적용한 프로젝트입니다.
즉 서비스가 여러 개로 나눠진 분산 시스템 구조입니다.
이러한 분산 시스템 구조에서는 데이터의 일관성을 확보하기가 상대적으로 어렵습니다.
발행되지 않아야 하는 메시지가 발행되거나, 발행되어야 하는 메시지가 발행되지 않고 누락되기도 합니다.
이번 글에서는 간단한 예시와 함께, 흔히 볼 수 있는 메시지 발행, 처리 방식들의 문제점들을 짚어보려고 합니다.
또한 이 문제점들을 해결하는 방법 중 하나인 Transactional Outbox Pattern을 소개하고 FitTrip 프로젝트에 적용한 과정에 대해 얘기하려 합니다.
분산 시스템에서 데이터를 전달하는 방법은 아래 글을 참고해주시면 감사하겠습니다.
https://an-jjin.tistory.com/42
잘못된 메시지 처리 프로세스
잘못된 메시지 처리 프로세스를 소개해 드리기 위해 간단한 예시 시스템을 정의해 보겠습니다.
고객은 결제를 요청하고 계좌를 확인하는 간단한 시스템입니다.
고객의 결제 내역에 대한 정보는 결제 서비스가 관리합니다.
계좌 서비스는 결제 이후 고객의 계좌에서 돈을 빼는 부분을 관리합니다.
두 시스템은 각자의 데이터베이스를 운영합니다.
결제 내역에 대한 정보는 결제 서비스 DB에 저장되고 계좌에 대한 정보는 계좌 서비스 DB에 저장됩니다.
결제 이후 계좌 서비스에서 결제한 만큼 고객의 계좌 정보를 변경해야 합니다.
두 시스템 간의 동기화 작업은 메시징 방식을 이용한다고 가정하겠습니다.
발행되어야 하는 메시지가 발행되지 않는다
결제 시스템은 고객이 결제를 요청하면 메시지를 발행하려고 합니다.
간단하게, PaymentService가 고객 결제 요청 처리하고 완료했다는 정보를 저장하면 PaymentController는 결제를 완료했다는 메시지를 발행합니다.
@Service
public class PaymentService {
private final IPaymentRepository repository;
...
@Transactional
public Payment processPayment(ProcessPayment command) {
validate(command);
var payment = Payment.create(command);
repository.save(payment);
return payment;
}
}
@RestController
public class PaymentController {
private final PaymentService service;
private final IMessagePublisher publisher;
...
@PostMapping("/payments")
public void processPayment(@RequestBody ProcessPayment command) {
Payment payment = service.processPayment(command);
var message = PaymentProcessed.from(payment);
publisher.publish(message);
}
}
이 방식의 문제는 발행되어야 하는 메시지가 발행되지 않을 수 있다는 것입니다.
위 코드 예시에서 publisher.publish(message) 코드 실행 중 예외가 발생하면 어떻게 될까요?
이미 결제 정보을 저장하는 트랜잭션은 커밋되어 결제 시스템의 데이터베이스에는 새로운 결제 정보가 저장되었습니다.
하지만 결제가 완료 되었다는 메시지는 발행되지 않고, 계좌 시스템의 데이터베이스에는 새로운 결제 정보가 동기화되지 않습니다. 고객은 결제를 했지만 계좌에 대한 정보는 변동이 없어 회사에 불이익으로 이어집니다.
메시지 발행기에 재시도 패턴을 적용하면 해결될까요?
네트워크 문제로 인한 일시적 실패에는 어느 정도 대응할 수 있겠지만, 메시지 중개를 담당하는 외부 메시지 브로커 시스템(e.g. AWS)의 장애가 원인인 경우 재시도 패턴으로는 문제를 완벽히 방지할 수 없습니다. 재시도 횟수나 주기를 늘리는 경우, 외부 시스템의 문제가 내부 시스템의 지연으로 이어질 수 있습니다.
발행되지 않아야 하는 메시지가 발행된다
위 시나리오의 문제를 해결하기 위해, 메시지 발행이 실패하면 결제 등록을 실패시켜 보려고 합니다.
메시지가 발행되지 않으면 결제 정보도 등록되지 않으니 문제가 해결되지 않을까요?
@RestController
public class PaymentController {
private final PaymentService service;
...
@PostMapping("/payments")
public void processPayment(@RequestBody ProcessPayment command) {
service.processPayment(command);
}
}
@Service
public class PaymentService {
private final IPaymentRepository repository;
private final IMessagePublisher publisher;
...
@Transactional
public void processPayment(ProcessPayment command) {
validate(command);
var payment = Payment.create(command);
repository.save(payment);
var message = PaymentProcessed.from(payment);
publisher.publish(message);
}
}
publisher.publish(message) 코드가 예외를 발생시키면 트랜잭션이 롤백되고, 결제는 데이터베이스에 저장되지 않습니다. 하지만 새로운 문제가 있습니다. 발행되지 않아야 하는 메시지가 발행될 수 있다는 것입니다. 편의상 위 서비스 코드를 아래와 같이 간단히 풀어 적어보겠습니다.
PaymentService.java (직접 트랜잭션 처리)
@Service
public class PaymentService {
private final IPaymentRepository repository;
private final IMessagePublisher publisher;
private final Transaction transaction;
...
public void processPayment(ProcessPayment command) {
transaction.begin();
try {
validate(command);
var payment = Payment.create(command);
repository.save(payment);
var message = PaymentProcessed.from(payment);
publisher.publish(message);
transaction.commit();
} catch (Exception e) {
transaction.rollback();
throw e;
}
}
}
publisher.publish(message) 코드가 문제없이 수행된 다음, 데이터베이스 지연 등으로 인해 트랜잭션 커밋이 실패한다면 어떻게 될까요? 결제는 데이터베이스에 저장되지 않습니다. 하지만 이미 메시지는 발행되었고, 계좌 시스템은 새로운 결제 정보를 받고 계좌 데이터베이스에 최종적으로 결제하지 않은 고객의 돈을 빼고 업데이트하는 일이 발생합니다.
트랜잭션 안에서 메시지를 발행하는 방법은 또 다른 문제점을 가지고 있습니다.
강남언니 공식 블로그의 예시 내용을 가져와 보겠습니다.
트랜잭션 커밋이 성공하지만, 꽤 지연되는 시나리오를 생각해 보겠습니다.
메시지를 받은 검색 시스템은 등록된 상품을 조회해 자체 데이터베이스에 저장하려고 합니다.
하지만 아직 트랜잭션이 커밋되지 않은 상태이므로, 상품 조회 요청을 받은 마켓 시스템은 상품이 존재하지 않는다고 판단합니다. 즉, 마켓 시스템은 ‘아직’ 발행하지 않아야 하는 메시지를 발행하기도 하는 것입니다. 검색 시스템은 상품 조회에 실패하는 경우, 메시지를 버려야 할지 조금 더 기다려야 할지 알 수 없습니다.
상품 등록이 아닌 수정 시나리오에서 아직 발행되지 않아야 하는 메시지가 발행되는 경우는 더 제어하기 어렵습니다.
검색 시스템은 상품 정보가 변경되었다는 메시지를 받고, 상품 정보를 조회합니다.
이때 마켓 시스템이 반환하는 상품 정보는 변경이 완료된 것일 수도, 아닐 수도 있습니다.
만약 위 다이어그램처럼, 트랜잭션 커밋이 지연되면서 검색 시스템에 상품 정보가 동기화되지 않는 경우, 검색 화면에서 사용자들에게 오랜 기간 잘못된 가격 정보가 노출되거나 재고가 다 소진된 상품이 노출될 수 있습니다. 이런 경우 서비스의 사용성이 저하되고, 플랫폼은 사용자의 신뢰를 잃게 될 수도 있습니다.
메시지의 발행 순서가 엄격하게 지켜지지 않는다
때때로 메시지의 발행 순서가 엄격하게 지켜져야 하는 경우가 있습니다.
고객의 결제 요청이 처리된 후 '결제 완료' 메시지가 발행되고, 계좌 서비스가 이를 받아서 처리해야 데이터의 일관성이 보장됩니다. 그러나 만약 메시지 발행 순서가 지켜지지 않는 경우, 두 가지 문제가 발생할 수 있습니다.
- 결제 서비스가 '결제 완료' 메시지를 발행하기 전에 계좌 서비스에서 차감 처리를 시도하는 경우: 이 경우, 계좌 서비스는 아직 결제가 완료되지 않았다는 상태에서 차감 요청을 처리하려 하므로, 실제 계좌 잔액과 결제 상태 간의 불일치가 발생할 수 있습니다.
- 메시지 발행 순서가 뒤바뀌는 경우: 결제 서비스가 '결제 완료' 메시지를 발행하고, 계좌 서비스가 이를 수신해야 하는데, 메시지 순서가 뒤바뀌어 '결제 취소'와 같은 메시지가 먼저 발행된다면, 계좌 서비스는 잘못된 상태 정보를 반영하여 고객의 계좌 잔액을 유지하게 될 수 있습니다.
결제와 계좌 업데이트의 순서는 매우 중요합니다. 만약 메시지 발행 순서가 지켜지지 않으면, 계좌 서비스가 고객의 결제 요청에 대한 올바른 상태를 반영하지 못하게 되어 데이터의 일관성이 깨질 수 있습니다.
배치 보상 프로세스는 문제를 해결해 주지 않는다
위와 같은 문제를 해결하기 위해 제일 쉽게 생각해 볼 수 있는 것은 배치 보상 프로세스입니다.
예를 들어, 이 예시에서 계좌 시스템을 관리하는 팀은 간헐적으로 결제 정보가 계좌 시스템에 제대로 동기화되지 않는 것을 발견하고, 하루에 한 번 모든 결제 정보를 계좌 시스템에 동기화하는 배치 프로세스를 만들 수 있습니다. 이런 보상 프로세스에는 다음과 같은 문제가 있습니다.
시스템 자원을 비효율적으로 사용하게 됩니다. 결제 정보 개수가 늘어날수록, 보상 주기를 짧게 할수록 사용하는 시스템 자원이 늘어납니다. 동기화 문제가 발생하는 데이터의 개수가 몇 개든 모든 데이터를 확인해야 합니다.
실시간성을 잃게 됩니다. 대규모 보상 프로세스를 하루 종일 실행할 수는 없으니 적당한 주기를 정하게 되는데, 어떤 데이터는 보상 프로세스 실행 주기만큼 잘못된 상태로 남아있게 됩니다. 하루에 한 번 보상 프로세스가 실행되는 경우, 어떤 데이터는 꼬박 하루 동안 잘못된 상태로 남아있을 수 있는 것입니다. 비즈니스 요구사항에 따라 이런 긴 동기화 지연이 허락되지 않는 경우도 있습니다.
해결책
앞서 확인한 문제는 결국 메시지를 발행하는 시스템과 소비하는 시스템, 그 사이를 중개하는 메시지 브로커 시스템이 모두 분산되어 있기 때문에 발생합니다. 이 문제를 해결하는 방법은 여러 가지가 있지만, 가장 쉬운 방법 중 하나인 Transactional Outbox Pattern을 소개하려고 합니다. Transactional Outbox Pattern은 데이터베이스 트랜잭션을 이용한 At-Least Once Delivery와 멱등 처리를 활용해, 앞서 살펴본 세 가지 문제를 모두 해결합니다. 하나씩 천천히 살펴보겠습니다.
At-Least Once Delivery
@Service
public class PaymentService {
private final IPaymentRepository repository;
private final IPaymentMessageOutbox outbox;
...
@Transactional
public void processPayment(ProcessPayment command) {
validate(command);
var payment = Payment.create(command);
repository.save(payment);
var message = PaymentProcessed.from(payment);
outbox.save(message);
}
}
결제 정보를 저장하는 트랜잭션 안에서 발행할 메시지 정보도 함께 저장합니다.
메시지는 상품 정보가 저장되는 데이터베이스 내 다른 테이블에 저장됩니다.
IPaymentMessageOutbox는 일종의 Repository입니다.
데이터베이스 트랜잭션이 원자성을 보장하기 때문에, 상품과 메시지는 모두 저장되거나 모두 저장되지 않습니다.
따라서, 발행되지 않아야 하는 메시지가 Outbox에 저장되거나, 발행되어야 하는 메시지가 Outbox에 저장되지 않는 일은 발생하지 않습니다.
public class MessagePublishingJob {
private final IPaymentMessageOutbox outbox;
private final IMessagePublisher publisher;
...
@Scheduled(fixedRate = 1000)
public void publishMessages() {
var messages = outbox.read(10);
publisher.publish(messages);
outbox.delete(messages);
}
}
Outbox에 저장된 메시지를 Polling 해, 성공할 때까지 발행합니다.
어떤 메시지는 여러 번 발행될 수도 있습니다.
이것을 At-Least Once Delivery 전략이라고 합니다.
이 예시에서는 발행이 완료된 메시지를 데이터베이스에서 제거하고 있지만, 기록을 위해 남겨두고 발행 완료로 마킹할 수도 있습니다.
Idempotent Handling
계좌 시스템은 기존과 같이 결제 시스템으로부터 온 메시지를 수신해, 계좌 데이터베이스에 결제 정보를 저장합니다. 이때, 결제 시스템이 같은 메시지를 여러 번 발행할 수 있기 때문에, 계좌 시스템은 메시지를 멱등하게 처리해야 합니다. 즉, 같은 메시지를 여러 번 처리해도 한 번 처리한 것과 같은 결과물이 나오도록 해야 합니다.
멱등 처리를 위해서는 여러 가지 방법을 사용할 수 있습니다. 메시지의 고유 식별자를 이용해 이미 처리한 메시지는 다시 처리하지 않도록 할 수도 있고, 메시지를 수신할 때마다 상품의 현재 상태를 확인함으로써 여러 번 처리해도 같은 결과가 나오도록 할 수도 있습니다.
정리하면, 메시지를 발행하는 시스템은 데이터베이스 트랜잭션을 이용해 메시지를 발행 대기열 테이블에 저장합니다. 트랜잭션이 제공하는 원자성을 이용해, 발행하지 않아야 하는 메시지가 대기열 테이블에 들어가거나, 발행해야 하는 메시지가 대기열 테이블에 들어가지 않는 문제를 방지합니다. 이후 저장된 메시지를 성공할 때까지 발행합니다(At-Least Once Delivery). 메시지를 수신하는 시스템은 같은 메시지를 여러 번 수신해도 한 번 수신한 것과 같이 처리해, 메시지가 정확히 한 번 처리되도록(Exactly-Once Processing) 합니다. 이 패턴을 Transactional Outbox Pattern이라고 합니다.
Transactional Outbox Pattern을 사용하지 않고, 메시지를 발행하는 시스템에 Exactly-Once Delivery 전략을 구현할 수도 있습니다. 그런 경우 메시지를 수신하는 시스템은 멱등 처리를 하지 않아도 되지만, 시간당 메시지 처리량은 At-Least Once Delivery 방식에 비해 줄어들게 됩니다.
실제 구현 과정
앞서 잘못된 메시지 처리 프로세스에 대해 알아봤고 이러한 해결책 중 하나인 트랜잭셔널 아웃박스 패턴을 FitTrip 프로젝트에 어떻게 적용하였는지 적어보려 합니다.
1. TransactionalEventListener에서 BEFORE_COMMIT 활용
초기 설계 방식은 아래와 같습니다.
- 도메인 로직 처리
- 이벤트 DB에 이벤트 insert
- 이벤트 발행
- 트랜잭션 commit 후 카프카에 이벤트 발행
1 ~3을 하나의 트랜잭션으로 묶어서 처리하려 했습니다.
1 ~ 3번 구간이 성공적으로 진행되고 트랜잭션이 commit이 이루어지면 3번 구간에서 발행된 이벤트를 바라보는 4번 구간에서 이벤트를 읽어서 카프카에 메시지를 발행합니다. 이때 4번 구간에서는 TransactionalEventListener의 phase = AFTER_COMMIT을 붙였는데, 이는 앞단의 로직에서 트랜잭션 commit 이 완료된 이후 시점에 이벤트를 읽겠다는 의미입니다.
다른 자료를 참고했을 때 주로 위와 같이 설계하고 로직을 구현하는 방식이 대다수였습니다.
이럴 경우 도메인 로직에 이벤트 발행 로직도 섞이게 되어 관심사를 분리하고 싶어 다른 방식에 대해 찾아봤고 좋은 방식을 찾아 아래와 같이 초기 설계를 수정했습니다.
- 트랜잭션으로 묶인 도메인 로직에서 이벤트 테이블에 이벤트를 기록하는 2번 구간의 로직을 별도의 Listener로 옮긴다. 해당 Listener는 (4번 구간의 Listener와 동일하게) 3번 구간에서 발행되는 이벤트를 바라본다. 다만 (4번 Listener 와는 다르게) 2번 Listener는 phase = BEFORE_COMMIT 이 붙은 @TransactionalEventListener을 선언한다.
- 즉, 3번 구간의 이벤트를 바라보는 Listener 가 기존 1개에서 2개로 변경되었는데, 2번 구간의 Listener는 phase = BEFORE_COMMIT를 붙이고, 4번 구간의 Listener는 기존대로 phase = AFTER_COMMIT을 붙인다.
도메인 트랜잭션이 완료되었을 때 실행되는 로직은 크게 3가지로 볼 수 있습니다.
- 도메인 로직
- 이벤트 테이블에 이벤트 기록
- 이벤트를 카프카로 전송
기존 설계에서는 1번과 2번을 하나의 트랜잭션 안에 묶고 처리했다면, 변경된 구현에서는 2번을 별도의 Listener로 옮기고, 도메인 이벤트가 발행되었을 때 2번 로직을 처리하는 phase = BEFORE_COMMIT의 Listener와 3번 로직을 처리하는 phase = AFTER_COMMIT의 Listener 가 각각 동시에 동작하도록 한다는 것입니다.
사실 기존 설계와 변경된 구현 간에는 큰 차이가 없다고 봐도 무방합니다.
다만 도메인 이벤트의 발행 전과 후를 비교할 때, 변경된 구현에서는 이벤트 발행 이후의 로직은 모두 TransactionalEventListener에서 처리하게 하였고, 이는 “이벤트 발행 이후에 처리되는 모든 로직은 TransactionalEventListener 에서 구현한다” 는 일관성을 유지할 수 있게 해주는 장점이 있다고 봅니다.
spring이 제공하는 ApplicationEventPublisher를 사용하는 근본적인 이유는, 처리해야 할 도메인 로직과 그 이후에 처리되어야 할 로직을 분리하기 위함입니다.
- 도메인 로직이 완료된 이후에 이를 표현하는 이벤트를 발행하고
- 도메인 로직 완료 이후에 처리해야 할 것들은 해당 이벤트를 바라보는 Listener 에 구현한다면
- 도메인 로직은 도메인의 요구사항에만 집중할 수 있고, 그 이외의 것들은 해당 이벤트를 바라보는 Listener 에 추가하기만 하면 된다.
이러한 관점에서 본다면 기존 설계보다는 변경된 구현이 확장성과 응집성이 좀 더 높은 방식이라고 봤습니다.
아래는 (1) 초기 설계에 맞춘 구현과 (2) 변경된 컨셉에 맞춘 (최종적으로 완성된) 구현의 예시입니다.
(1) 초기 설계에 맞춘 구현
@Service
@Transactional
@RequiredArgsConstructor
public class ServerMessageCommandServiceImpl implements ServerMessageCommandService {
private final SequenceGenerator sequenceGenerator;
private final ServerMessageRepository messageRepository;
@Override
public void save(ServerMessageCreateRequest createRequest) {
// 1. 도메인 로직
ServerMessage serverMessage = ServerMessage.builder()
.serverId(createRequest.getServerId())
// 기타 필드 설정
.files(createRequest.getFiles())
.build();
serverMessage.generateSequence(sequenceGenerator.generateSequence(ServerMessage.SEQUENCE_NAME));
ServerChatCreateEvent chatCreateEvent = ServerChatCreateEvent.from(messageRepository.save(serverMessage));
// 2. 이벤트 테이블에 이벤트 기록
recordEventToOutboxTable(chatCreateEvent);
// 3. 이벤트 발행
Events.send(chatCreateEvent);
}
}
@EventHandler
@RequiredArgsConstructor
public class ServerChatEventListener {
private final ServerChatEventRepository serverChatEventRepository;
private final SequenceGenerator sequenceGenerator;
private final ChatEventProducer chatEventProducer;
// 2. 카프카에 메시지 전송 (TransactionPhase.AFTER_COMMIT)
@Async
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void sendMessageHandler(ServerChatModifyEvent chatModifyEvent) {
ServerChatEvent serverChatEvent = createServerChatEvent(chatModifyEvent);
publishServerChatEvent(serverChatEvent);
}
...
}
(2) 변경된 컨셉에 맞춘 구현
@Transactional
@RequiredArgsConstructor
public class ServerMessageCommandServiceImpl implements ServerMessageCommandService {
private final SequenceGenerator sequenceGenerator;
private final ServerMessageRepository messageRepository;
@Override
public void save(ServerMessageCreateRequest createRequest) {
// 1. 도메인 로직
ServerMessage serverMessage = ServerMessage.builder()
.serverId(createRequest.getServerId())
...
.files(createRequest.getFiles())
.build();
serverMessage.generateSequence(sequenceGenerator.generateSequence(ServerMessage.SEQUENCE_NAME));
ServerChatCreateEvent chatCreateEvent = ServerChatCreateEvent.from(messageRepository.save(serverMessage));
// 2. 이벤트 발행
Events.send(chatCreateEvent);
}
...
}
@Slf4j
@Service
@RequiredArgsConstructor
public class ServerChatEventHandler {
private final SequenceGenerator sequenceGenerator;
private final ChatEventProducer chatEventProducer;
private final ServerChatEventRepository serverChatEventRepository;
// 1. outbox 테이블에 이벤트 기록 (TransactionPhase.BEFORE_COMMIT)
@TransactionalEventListener(classes = ServerChatCreateEvent.class, phase = TransactionPhase.BEFORE_COMMIT)
public void serverChatCreateEventBeforeHandler(ServerChatCreateEvent chatCreateEvent) {
ServerChatEvent serverChatEvent = createServerChatEvent(chatCreateEvent);
serverChatEvent.generateSequence(sequenceGenerator.generateSequence(ServerChatEvent.SEQUENCE_NAME));
serverChatEventRepository.save(serverChatEvent);
}
// 2. 카프카에 메시지 전송 (TransactionPhase.AFTER_COMMIT)
@Async
@Retryable(retryFor = {Exception.class}, maxAttempts = 3, backoff = @Backoff(delay = 2000L))
@TransactionalEventListener(classes = ServerChatCreateEvent.class, phase = TransactionPhase.AFTER_COMMIT)
public void serverChatCreateEventAfterHandler(ServerChatCreateEvent chatCreateEvent) {
ServerChatEvent serverChatEvent = createServerChatEvent(chatCreateEvent);
publishServerChatEvent(serverChatEvent);
}
private void publishServerChatEvent(ServerChatEvent serverChatEvent) {
ServerChatEvent chatEvent = serverChatEventRepository.findByEventId(serverChatEvent.getEventId())
.orElseThrow(() -> new RuntimeException("no event"));
try {
chatEventProducer.sendToServerChatTopic(chatEvent);
chatEvent.changeEventSentType(EventSentType.SEND_SUCCESS);
serverChatEventRepository.save(chatEvent);
} catch (Exception e) {
chatEvent.changeEventSentType(EventSentType.SEND_FAIL);
serverChatEventRepository.save(chatEvent);
}
}
}
2. 이벤트 저장 시 UUID 활용
위 코드에서 문제가 발생했던 부분은 아래 코드였습니다.
private void publishServerChatEvent(ServerChatEvent serverChatEvent) {
ServerChatEvent chatEvent = serverChatEventRepository.findByEventId(serverChatEvent.getEventId())
.orElseThrow(() -> new RuntimeException("no event"));
try {
chatEventProducer.sendToServerChatTopic(chatEvent);
chatEvent.changeEventSentType(EventSentType.SEND_SUCCESS);
serverChatEventRepository.save(chatEvent);
} catch (Exception e) {
chatEvent.changeEventSentType(EventSentType.SEND_FAIL);
serverChatEventRepository.save(chatEvent);
}
}
BEFORE_COMMIT 단계에서 ServerChatEvent를 생성하고 저장하지만, 이 시점에서는 트랜잭션이 아직 커밋되지 않았기 때문에, AFTER_COMMIT 단계에서 publishServerChatEvent를 호출할 때 serverChatEvent.getEventId()가 null인 문제가 발생했습니다. 실제 실행 했을 때 java.lang.IllegalArgumentException: The given id must not be null 예외가 발생했습니다.
이러한 문제를 해결하기 위해 도메인 로직에서 UUID를 생성하고 발행할 이벤트 객체에 저장한 다음 이벤트를 발행했습니다.
변경 후 코드
@Slf4j
@Service
@Transactional
@RequiredArgsConstructor
public class ServerMessageCommandServiceImpl implements ServerMessageCommandService {
private final SequenceGenerator sequenceGenerator;
private final ServerMessageRepository messageRepository;
@Override
public void save(ServerMessageCreateRequest createRequest) {
ServerMessage serverMessage = ServerMessage.builder()
.serverId(createRequest.getServerId())
...
.files(createRequest.getFiles())
.build();
serverMessage.generateSequence(sequenceGenerator.generateSequence(ServerMessage.SEQUENCE_NAME));
// UUID 추가
ServerChatCreateEvent chatCreateEvent = ServerChatCreateEvent.from(messageRepository.save(serverMessage),
UUIDUtil.generateUUID());
Events.send(chatCreateEvent);
}
...
}
private void publishServerChatEvent(ServerChatEvent serverChatEvent) {
// UUID 값으로 이벤트를 조회함
ServerChatEvent chatEvent = serverChatEventRepository.findByUuid(serverChatEvent.getUuid())
.orElseThrow(() -> new RuntimeException("no event"));
try {
chatEventProducer.sendToServerChatTopic(chatEvent);
chatEvent.changeEventSentType(EventSentType.SEND_SUCCESS);
serverChatEventRepository.save(chatEvent);
} catch (Exception e) {
chatEvent.changeEventSentType(EventSentType.SEND_FAIL);
serverChatEventRepository.save(chatEvent);
}
}
3. 메시지 발행 상태 값의 상세화
초기 설계에서는 이벤트 테이블에 발행할 이벤트 정보를 기록할 때, 이벤트의 상태값을 표현하는 published와 같은 boolean 형태의 상태 값을 선언하고 처음 outbox 테이블에 이벤트를 기록할 때에는 published = false로 선언하여 이벤트 발행을 해야 하지만 아직 발행이 되지 않았음을 명확히 표현하고, 이후 트랜잭션이 commit 되고 이를 바라보는 TransactionalEventListener에서 이벤트를 읽어서 발행에 성공했을 때에는 published = true로 바꾸면 되겠지라고 생각을 했습니다.
이런 식으로 상태 값을 관리하면, 이후에 배치 로직을 구현하여 이벤트 테이블의 데이터를 주기적으로 체크하면서, 이벤트 발행 요청 시점(created_at) 은 한참 지났는데 아직도 published = false 인 데이터를 일괄로 읽어서 이벤트 발행을 재시도할 수 있다고 봤습니다.
하지만 자료 조사를 하면서 이벤트 발행이 실패하는 경우는 한 가지 경우가 아니라 최소 두 가지 경우가 발생한다고 합니다. 그래서 자료 조사 하면서 찾은 글을 참고하여 이벤트의 상태 값을 아래와 같이 좀 더 세분화하였습니다.
- init : 이벤트 발행 등록. 이벤트 테이블에 처음 기록될 때의 상태 값을 말함
- send_succes : 이벤트 발행 성공
- send_fail : 이벤트 발행 실패. 카프카에 메시지를 전송했지만 실패함
참고한 글을 요약하면 아래와 같습니다.
Spring에서 이벤트가 발행된 후, Listener가 이를 읽어 Kafka로 메시지를 성공적으로 발행하면 이벤트의 상태는 send_success로 업데이트됩니다. 그러나 이벤트 발행 중 Kafka 클러스터나 네트워크 이슈가 발생하면 상태는 send_fail로 변경됩니다. 반면, 트랜잭션 커밋 후 Listener가 이벤트를 읽지 못한 경우에는 이벤트 테이블의 이벤트가 init 상태로 남습니다.
이러한 문제는 주로 서비스 배포 등으로 프로세스가 종료될 때 발생할 수 있으며, 이는 graceful shutdown이 제대로 이루어지지 않았기 때문입니다. 이 문제를 해결하려면 ThreadPoolTaskExecutor의 설정에서 다음 두 가지를 명시해야 합니다:
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(10);
이 설정을 통해 서비스 배포 중에도 비동기 작업이 완료될 때까지 기다릴 수 있도록 하여, 배포 이후에도 init 상태의 이벤트가 남지 않게 했습니다. (자세한 내용은 Baeldung의 글 참고).
4. MongoDB에 트랜잭션 적용
트랜잭셔널 아웃박스 패턴의 핵심은 DB의 트랜잭션 기능을 활용하는 것입니다.
MySQL을 사용한다면 Spring Boot에서 도메인 로직에 @Transactional을 붙여 놓으면 트랜잭션을 쉽게 사용할 수 있습니다.
현재 FitTrip 프로젝트의 채팅 서비스 쪽에서는 MongoDB를 사용하고 있었습니다.
MongoDB도 @Transactional이 잘 작동하는지 테스트하기 위해 도메인 로직에 @Transactional을 붙이고 롤백이 발생하는 코드를 실행하니 아래와 같은 예외가 발생합니다.
com.mongodb.MongoCommandException: Command failed with error 263
(ShardingOperationFailed): 'Transaction numbers are only allowed on a replica set
member or mongos' on server localhost:27017.
The full response is { "ok" : 0.0, "errmsg" : "Transaction numbers are only
allowed on a replica set member or mongos", "code" : 263,
"codeName" : "ShardingOperationFailed" }
해당 예외는 레플리카 세트가 아닌 MongoDB 환경에서 트랜잭션을 관리하기 위해 MongoTransactionManager를 사용하는 경우 Spring Data MongoDB는 트랜잭션 시작 프로세스 중에 예외를 발생시킵니다.
예외를 통해서 mongoDB는 스프링부트에서 @Transactional을 사용하기 위해 replicaSet 환경을 구축해야 한다는 것을 알았습니다. MongoDB 트랜잭션은 데이터의 일관성과 무결성을 유지하면서 원자적으로 실행해야 하는 여러 작업을 트랜잭션에 포함하는 경우가 많기 때문에 레플리카 세트 내에서 작동하도록 설계되었다고 하네요.
그래서 MongoDB의 트랜잭션을 활용하기 위해 replicaSet 환경을 구축했고 해당 내용을 정리하여 블로그에 포스팅했습니다. 참고하실 분은 참고하면 감사하겠습니다.
https://an-jjin.tistory.com/20
마무리 하며
이번 FitTrip 프로젝트를 통해 분산 시스템에서 데이터 일관성의 중요성을 깨달았습니다.
이 문제를 해결하기 위해 여러 자료를 조사하면서 트랜잭셔널 아웃박스 패턴을 알게 되었고, 이 패턴이 데이터의 원자성을 보장하여 메시지 발행의 신뢰성을 높일 수 있다는 점을 확인했습니다. 이러한 이유로 FitTrip 프로젝트에 트랜잭셔널 아웃박스 패턴을 적용하여 데이터 일관성을 확보하고 시스템의 안정성을 강화하려 했습니다.
참고
https://velog.io/@eastperson/Transaction-Outbox-Pattern-%EC%95%8C%EC%95%84%EB%B3%B4%EA%B8%B0
'프로젝트 > FitTrip' 카테고리의 다른 글
개발 기록 - 온라인/오프라인 상태 메시지 전송 성능 최적화 (0) | 2024.07.14 |
---|---|
개발 기록 - 유저의 실시간 온/오프 상태 처리 기능 (1) | 2024.07.14 |
개발 기록 - 유저의 마지막 채널 위치 기억 기능 (0) | 2024.07.13 |
개발 기록 - 분산 시스템에서 데이터를 전달하는 효율적인 방법 (0) | 2024.07.12 |
트러블 슈팅 - SockJs를 사용한 웹소켓 연결 시 CORS 이슈 (0) | 2024.07.08 |