서론
이번 글에서는 유저의 실시간 온/오프 상태 처리 기능(https://an-jjin.tistory.com/44)을 구현하면서 겪었던 이슈 중 세 번째 이슈에 대해 다루겠습니다. 개발을 진행하면서 처음 구현한 로직에서 최종 로직까지의 개선 과정을 설명해 보려고 합니다.
문제 상황
채팅 서비스에서 유저의 웹소켓 연결 상태를 파악하여 해당 유저가 온라인인지 오프라인인지 지정하는 기능을 구현했습니다.
해당 유저를 A라고 지칭하겠습니다. 유저 A의 온라인/오프라인 이벤트 데이터를 카프카의 특정 토픽으로 전송하고, 다른 채팅 서비스들이 이를 받아와 유저 A가 속한 서버(단체 채팅방)와 DM방에 있는 유저들에게 브로드캐스팅합니다.
카프카에 보내는 이유는 이유는 채팅 서비스의 스케일 아웃을 고려했기 때문입니다.(관련 포스팅: 채팅 서비스 스케일 아웃 고려).
서버와 DM방에 있는 유저들은 채팅 서비스와 웹소켓으로 연결되어 있는 상황입니다.
아래 코드를 보면서 설명을 이어나가겠습니다.
@Slf4j
@Component
@RequiredArgsConstructor
public class connectionStateEventConsumer {
private final SimpMessageSendingOperations messagingTemplate;
@KafkaListener(topics = "${spring.kafka.topic.connection-state-event}", groupId = "${spring.kafka.consumer.group-id.connection-state-event}", containerFactory = "connectionStateEventListenerContainerFactory")
public void connectionStateEventListener(ConnectionStateEventDto connectionStateEventDto) {
...
}
}
온라인/오프라인 이벤트 데이터는 ConnectionStateEventDto입니다.
카프카로부터 ConnectionStateEventDto를 받아오는 부분은 connectionStateEventListener() 메서드입니다.
유저 A가 여러 개의 서버나 DM방에 속해 있을 경우 ConnectionStateEventDto를 동시에 전송하는 부분에 문제가 발생했습니다. SimpMessageSendingOperations의 convertAndSend 메서드를 사용하여 메시지를 브로드캐스팅하면 쉽게 해결될 것 같았지만, 유저가 여러 개의 서버와 DM방에 속해 있을 수 있기 때문에 동시에 ConnectionStateEventDto를 보내는 데 어려움이 있었습니다.
초기 구현
처음에는 단순히 for문을 돌려서 토픽별로 ConnectionStateEventDto를 브로드캐스팅 했습니다.
하지만 이 방법은 다음과 같은 문제점을 가지고 있었습니다
@KafkaListener(topics = "${spring.kafka.topic.connection-state-event}", groupId = "${spring.kafka.consumer.group-id.connection-state-event}", containerFactory = "connectionStateEventListenerContainerFactory")
public void connectionStateEventListener(ConnectionStateEventDto connectionStateEventDto)
throws JsonProcessingException {
if (connectionStateEventDto != null && ("CONNECT".equals(connectionStateEventDto.getType())
|| "DISCONNECT".equals(connectionStateEventDto.getType()))) {
HashMap<String, String> stateInfo = new HashMap<>();
stateInfo.put("state", connectionStateEventDto.getState());
stateInfo.put("type", connectionStateEventDto.getType());
stateInfo.put("userId", String.valueOf(connectionStateEventDto.getUserId()));
String connectionEvent = objectMapper.writeValueAsString(stateInfo);
if (connectionStateEventDto.getServerIds() != null) {
for (Long serverId : connectionStateEventDto.getServerIds()) {
messagingTemplate.convertAndSend("/topic/server/" + serverId, connectionEvent);
}
}
if (connectionStateEventDto.getRoomIds() != null) {
for (Long roomId : connectionStateEventDto.getRoomIds()) {
messagingTemplate.convertAndSend("/topic/direct/" + roomId, connectionEvent);
}
}
}
}
이 코드의 문제점은 messagingTemplate.convertAndSend()를 for문을 돌면서 여러 번 호출하다 보니, 토픽 별로 메시지 전송에 딜레이가 발생한다는 것입니다. 예를 들어, 유저가 1번부터 1000번 서버에 속해 있다면, /topic/server/1에 보내는 것과 /topic/server/1000에 보내는 것에는 딜레이가 발생할 수밖에 없습니다.
개선 과정
1차 개선
초기 구현에서는 server와 dm의 경로가 나눠져 있어 유저가 구독한 server와 dm의 경로를 topicPrefixes에 담아 for문을 돌려 메시지를 전송했습니다.
@KafkaListener(topics = "${spring.kafka.topic.connection-state-event}", groupId = "${spring.kafka.consumer.group-id.connection-state-event}", containerFactory = "connectionStateEventListenerContainerFactory")
public void connectionStateEventListener(ConnectionStateEventDto connectionStateEventDto)
throws JsonProcessingException {
if (connectionStateEventDto != null && ("CONNECT".equals(connectionStateEventDto.getType())
|| "DISCONNECT".equals(connectionStateEventDto.getType()))) {
HashMap<String, String> stateInfo = new HashMap<>();
stateInfo.put("state", connectionStateEventDto.getState());
stateInfo.put("type", connectionStateEventDto.getType());
stateInfo.put("userId", String.valueOf(connectionStateEventDto.getUserId()));
String connectionEvent = objectMapper.writeValueAsString(stateInfo);
List<String> topicPrefixes = getTopicPrefixes(connectionStateEventDto);
for (String topicPrefix : topicPrefixes) {
messagingTemplate.convertAndSend(topicPrefix, connectionEvent);
}
}
}
private static List<String> getTopicPrefixes(ConnectionStateEventDto connectionStateEventDto) {
List<String> topicPrefixes = new ArrayList<>();
if (connectionStateEventDto.getServerIds() != null) {
for (Long serverId : connectionStateEventDto.getServerIds()) {
topicPrefixes.add("/topic/server/" + serverId);
}
}
if (connectionStateEventDto.getRoomIds() != null) {
for (Long roomId : connectionStateEventDto.getRoomIds()) {
topicPrefixes.add("/topic/direct/" + roomId);
}
}
return topicPrefixes;
}
2차 개선
1차 개선에도 여전히 토픽별로 딜레이가 존재합니다.
그래서 parallelStream() 메서드를 사용하여 각 토픽에 대한 메시지 전송을 병렬로 처리하는 방법을 적용했습니다.
이를 통해 각 토픽에 대한 메시지 전송이 병렬로 이루어져서 처리 속도를 높일 수 있습니다.
@KafkaListener(topics = "${spring.kafka.topic.connection-state-event}", groupId = "${spring.kafka.consumer.group-id.connection-state-event}", containerFactory = "connectionStateEventListenerContainerFactory")
public void connectionStateEventListener(ConnectionStateEventDto connectionStateEventDto)
throws JsonProcessingException {
if (connectionStateEventDto != null && ("CONNECT".equals(connectionStateEventDto.getType())
|| "DISCONNECT".equals(connectionStateEventDto.getType()))) {
HashMap<String, String> stateInfo = new HashMap<>();
stateInfo.put("state", connectionStateEventDto.getState());
stateInfo.put("type", connectionStateEventDto.getType());
stateInfo.put("userId", String.valueOf(connectionStateEventDto.getUserId()));
String connectionEvent = objectMapper.writeValueAsString(stateInfo);
List<String> topicPrefixes = getTopicPrefixes(connectionStateEventDto);
topicPrefixes.parallelStream()
.forEach(topicPrefix -> messagingTemplate.convertAndSend(topicPrefix, connectionEvent));
}
}
private static List<String> getTopicPrefixes(ConnectionStateEventDto connectionStateEventDto) {
List<String> topicPrefixes = new ArrayList<>();
if (connectionStateEventDto.getServerIds() != null) {
for (Long serverId : connectionStateEventDto.getServerIds()) {
topicPrefixes.add("/topic/server/" + serverId);
}
}
if (connectionStateEventDto.getRoomIds() != null) {
for (Long roomId : connectionStateEventDto.getRoomIds()) {
topicPrefixes.add("/topic/direct/" + roomId);
}
}
return topicPrefixes;
}
}
3차 개선
Stream API를 사용하여 코드를 간결하게 만들고 상태 정보를 만드는 부분은 정적 팩토리 메서드를 사용하여 따로 응답 객체를 만들었습니다.
private final SimpMessageSendingOperations messagingTemplate;
@KafkaListener(topics = "${spring.kafka.topic.connection-state-event}", groupId = "${spring.kafka.consumer.group-id.connection-state-event}", containerFactory = "connectionStateEventListenerContainerFactory")
public void connectionStateEventListener(ConnectionStateEventDto connectionStateEventDto) {
if (connectionStateEventDto != null && ("CONNECT".equals(connectionStateEventDto.getType())
|| "DISCONNECT".equals(connectionStateEventDto.getType()))) {
ConnectionStateEventResponse connectionStateEventResponse = ConnectionStateEventResponse.from(
connectionStateEventDto);
List<String> topicPaths = generateTopicPaths(connectionStateEventDto);
topicPaths.parallelStream()
.forEach(topicPath -> messagingTemplate.convertAndSend(topicPath, connectionStateEventResponse));
}
}
private List<String> generateTopicPaths(ConnectionStateEventDto connectionStateEventDto) {
return Stream.concat(
connectionStateEventDto.getServerIds().stream().map(serverId -> "/topic/server/" + serverId),
connectionStateEventDto.getRoomIds().stream().map(roomId -> "/topic/direct/" + roomId))
.collect(Collectors.toList());
}
}
마무리하며
아직 개선할 부분이 남아 있는 상태입니다.
parallelStream()을 사용하여 생긴 장점과 단점은 아래와 같습니다.
장점:
- 병렬 처리: parallelStream()을 사용하면 여러 메시지를 병렬로 전송할 수 있어 성능 향상이 가능합니다. 특히 메시지 수가 많을 경우 유리합니다.
- 간결한 코드: 코드가 간결하고 읽기 쉬워집니다. 스트림 API를 사용하여 목록을 처리하는 방식이 깔끔합니다.
단점:
- 자원 사용: 병렬 스트림은 내부적으로 ForkJoinPool을 사용하여 스레드를 관리합니다. 이 과정에서 과도한 스레드 생성이나 컨텍스트 스위칭이 발생할 수 있어 자원 사용 측면에서 비효율적일 수 있습니다.
- 예외 처리: 병렬 스트림 내에서 발생하는 예외를 처리하기가 어렵습니다. 특정 메시지 전송에 실패했을 때 이를 어떻게 처리할지 명확하지 않습니다.
개선 방안:
- 순차 처리: 메시지 수가 많지 않다면 단순히 순차적으로 처리하는 것도 좋은 방법이라고 생각합니다. 그래서 parallelStream()을 사용하여 병렬로 처리할지 아니면 순차 처리를 할지는 성능 테스트 후 결정하려고 합니다.
- 예외 처리 강화: 병렬 처리 방식을 유지하되, 예외 처리를 강화하여 문제가 발생했을 때 로그를 남기거나 재시도 메커니즘을 도입할 수 있습니다.
참고
https://dev-coco.tistory.com/183
https://n1tjrgns.tistory.com/292
'프로젝트 > FitTrip' 카테고리의 다른 글
개발 기록 - 분산 시스템에 트랜잭셔널 아웃박스 패턴 적용하기 (0) | 2024.07.15 |
---|---|
개발 기록 - 유저의 실시간 온/오프 상태 처리 기능 (1) | 2024.07.14 |
개발 기록 - 유저의 마지막 채널 위치 기억 기능 (0) | 2024.07.13 |
개발 기록 - 분산 시스템에서 데이터를 전달하는 효율적인 방법 (1) | 2024.07.12 |
트러블 슈팅 - SockJs를 사용한 웹소켓 연결 시 CORS 이슈 (0) | 2024.07.08 |