서론
유저의 편의성을 위해 유저의 온라인, 온프라인 상태의 실시간 변경에 대해 처리를 하는 기능을 구현했습니다.
전체적인 로직은 아래와 같습니다.
로직 설계
사용자의 온라인 상태와 오프라인 상태에 대한 기준은 웹소켓 연결로 결정했습니다.
예를 들어 웹소켓으로 사용자가 채팅 서비스에 연결이 되어 있는 경우 온라인 상태로 웹소켓 연결이 끊기면 오프라인 상태라고 지정했습니다.
서비스 간 비동기 처리가 가능한 부분은 카프카를 사용하여 데이터를 전송했습니다.
서비스 간 동기 처리가 필요한 부분은 openfeign을 사용하여 처리했습니다.
온라인 상태 처리 로직
1. 사용자와 채팅 서비스가 웹소켓으로 연결되면 채팅 서비스에서는 해당 유저의 id값과 웹소켓 세션 id값을 가져옵니다.
2. 채팅 서비스는 카프카를 통해 상태관리 서비스로 유저의 online 상태에 대한 데이터를 전송합니다.
3. 채팅 서비스는 openfeign을 사용하여 커뮤니티 서비스로 해당 유저가 속해 있는 서버와 DM방에 대한 id 값들을 요청합니다.
4. 커뮤니티 서비스는 채팅 서비스로 유저가 속해 있는 서버와 DM방에 대한 id 값들을 응답합니다.
5. 채팅 서비스는 카프카로 유저의 online 상태에 대한 이벤트를 전송합니다.
6. 채팅 서비스들은 카프카로부터 유저의 online 상태에 대한 이벤트들을 받아옵니다.
7. 채팅 서비스는 online이 된 유저가 속해 있는 서버와 DM방에 있는 유저들에게 online 이벤트를 브로드캐스팅합니다.
오프라인 상태 처리 로직
1. 웹소켓 연결 끊길 시 채팅 서비스는 해당 유저의 웹소켓 세션 id 값을 가져옵니다.
2. 채팅 서비스는 openfeign을 사용하여 상태관리 서비스로 유저의 offline 상태에 대한 데이터를 전송합니다.
3. 상태 관리 서비스로부터 offline이 된 유정의 id값을 받아옵니다.
4. 채팅 서비스는 openfeign을 사용하여 커뮤니티 서비스로 해당 유저가 속해 있는 서버와 DM방에 대한 id 값들을 요청합니다.
5. 커뮤니티 서비스는 채팅 서비스로 유저가 속해 있는 서버와 DM방에 대한 id 값들을 응답합니다.
6. 채팅 서비스는 카프카로 유저의 offline 상태에 대한 이벤트를 전송합니다.
7. 채팅 서비스들은 카프카로부터 유저의 offline 상태에 대한 이벤트들을 받아옵니다.
8. 채팅 서비스는 offline이 된 유저가 속해 있는 서버와 DM방에 있는 유저들에게 offline 이벤트를 브로드캐스팅합니다.
구현 로직
채팅 서비스
웹소켓 online, offline 처리
@Slf4j
@Component
@RequiredArgsConstructor
public class WebSocketConnectionHandler implements ChannelInterceptor {
private final StateServiceClient stateClient;
private final CommunityServiceClient communityClient;
private final JwtTokenHandler jwtTokenHandler;
private final StateEventProducer stateEventProducer;
private static final String AUTH_PREFIX = "Authorization";
private static final String USER_ID = "userId";
...
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(message);
if (StompCommand.CONNECT.equals(headerAccessor.getCommand())) {
Long userId = sendConnectionStateInfo(headerAccessor);
sendConnectionStateEvent(userId);
}
if (StompCommand.DISCONNECT.equals(headerAccessor.getCommand())) {
Long userId = saveDisconnectionState(headerAccessor);
if (userId != null) {
sendDisConnectionStateEvent(userId);
}
}
}
...
private Long sendConnectionStateInfo(StompHeaderAccessor headerAccessor) {
Long userId = Long.parseLong(Objects.requireNonNull(headerAccessor.getFirstNativeHeader(USER_ID)));
String sessionId = headerAccessor.getSessionId();
ConnectionStateInfo connectionStateInfo = ConnectionStateInfo.builder()
.userId(userId)
.sessionId(sessionId)
.type(ConnectionType.CONNECT)
.state(ConnectionState.ONLINE)
.build();
stateEventProducer.sendToConnectionStateInfoTopic(connectionStateInfo);
return userId;
}
private void sendConnectionStateEvent(Long userId) {
UserServerDmInfo ids = communityClient.getServerIdsAndRoomIds(userId);
ConnectionStateEventDto connectionEventDto = ConnectionStateEventDto.builder()
.userId(userId)
.type(ConnectionType.CONNECT)
.state(ConnectionState.ONLINE)
.serverIds(ids.getServerIds())
.roomIds(ids.getDmIds())
.build();
stateEventProducer.sendToConnectionStateEventTopic(connectionEventDto);
}
private Long saveDisconnectionState(StompHeaderAccessor headerAccessor) {
String sessionId = headerAccessor.getSessionId();
ConnectionStateInfo connectionStateInfo = ConnectionStateInfo.builder()
.sessionId(sessionId)
.type(ConnectionType.DISCONNECT)
.state(ConnectionState.OFFLINE)
.build();
return stateClient.saveUserConnectionState(connectionStateInfo).getData();
}
private void sendDisConnectionStateEvent(Long userId) {
UserServerDmInfo ids = communityClient.getServerIdsAndRoomIds(userId);
ConnectionStateEventDto connectionEventDto = ConnectionStateEventDto.builder()
.userId(userId)
.type(ConnectionType.DISCONNECT)
.state(ConnectionState.OFFLINE)
.serverIds(ids.getServerIds())
.roomIds(ids.getDmIds())
.build();
stateEventProducer.sendToConnectionStateEventTopic(connectionEventDto);
}
}
postSend()
- StompHeaderAccessor: StompHeaderAccessor.wrap(message)를 호출하여 기존 메시지를 래핑 한 STOMP 헤더 액세서를 생성하여 STOMP 메시지의 헤더에 접근할 수 있습니다.
- StompCommand: STOMP 프로토콜에서 사용되는 명령어를 열거형으로 정의한 것입니다. 예: CONNECT, DISCONNECT, SUBSCRIBE, etc. STOMP는 메시지 브로커와의 통신을 위해 사용하는 간단한 텍스트 기반 프로토콜로, 클라이언트와 서버 간의 메시지 교환을 관리합니다. StompCommand는 STOMP 프레임의 종류를 나타내며, 클라이언트와 서버가 어떤 종류의 작업을 수행하는지 명시합니다.
- StompHeaderAccessor.wrap(message)를 통해 STOMP 헤더 액세서를 생성하여 STOMP 메시지의 헤더에 접근해 유저의 id 값을 가져왔습니다. 또한 StompCommand를 사용하여 STOMP 프레임이 CONNECT인 경우 웹소켓 연결에 요청이므로 해당 유저는 online 상태로 STOMP 프레임이 DISCONNECT인 경우 해당 유저는 offline 상태로 처리했습니다.
sendConnectionStateInfo()
- Long.parseLong(Objects.requireNonNull(headerAccessor.getFirstNativeHeader(USER_ID))) 를 통해 STOMP 메시지의 헤더에 접근해 유저의 id 값을 가져온다.
- headerAccessor.getSessionId()를 통해 웹소켓 연결을 요청한 유저의 웹소켓 세션 id를 가져옵니다.
- 카프카를 통해 상태관리 서비스로 ConnectionStateInfo 객체를 전송합니다.
sendConnectionStateEvent()
- communityClient.getServerIdsAndRoomIds(userId) 를 통해 커뮤니티 서비스로부터 유저가 속해 있는 서버 id와 DM방 id 목록들을 받아옵니다.
- 카프카로 유저의 online에 대한 데이터인 ConnectionStateEventDto 객체를 전송합니다.
saveDisconnectionState()
- headerAccessor.getSessionId() 를 통해 웹소켓 연결을 끊는 유저의 웹소켓 세션 id를 가져옵니다.
- openfeign을 사용하여 상태관리 서비스로 ConnectionStateInfo 객체를 전송하고 해당 유저의 id 값을 받아옵니다.
sendDisConnectionStateEvent()
- communityClient.getServerIdsAndRoomIds(userId)를 통해 커뮤니티 서비스로부터 유저가 속해 있는 서버 id와 DM방 id 목록들을 받아옵니다.
- 카프카로 유저의 offline에 대한 데이터인 ConnectionStateEventDto 객체를 전송합니다.
온/오프 이벤트 브로드캐스팅
@Slf4j
@Component
@RequiredArgsConstructor
public class connectionStateEventConsumer {
private final SimpMessageSendingOperations messagingTemplate;
@KafkaListener(topics = "${spring.kafka.topic.connection-state-event}", groupId = "${spring.kafka.consumer.group-id.connection-state-event}", containerFactory = "connectionStateEventListenerContainerFactory")
public void connectionStateEventListener(ConnectionStateEventDto connectionStateEventDto) {
if (connectionStateEventDto != null && (connectionStateEventDto.getType() == ConnectionType.CONNECT
|| connectionStateEventDto.getType() == ConnectionType.DISCONNECT)) {
ConnectionStateEventResponse connectionStateEventResponse = ConnectionStateEventResponse.from(
connectionStateEventDto);
List<String> topicPaths = generateTopicPaths(connectionStateEventDto);
topicPaths.parallelStream()
.forEach(topicPath -> messagingTemplate.convertAndSend(topicPath, connectionStateEventResponse));
}
}
private List<String> generateTopicPaths(ConnectionStateEventDto connectionStateEventDto) {
return Stream.concat(
Optional.ofNullable(connectionStateEventDto.getServerIds())
.stream()
.flatMap(List::stream)
.map(serverId -> "/topic/server/" + serverId),
Optional.ofNullable(connectionStateEventDto.getRoomIds())
.stream()
.flatMap(List::stream)
.map(roomId -> "/topic/direct/" + roomId))
.collect(Collectors.toList());
}
}
connectionStateEventListener()
- Kafka로부터 수신한 ConnectionStateEventDto 메시지를 처리하는 메서드입니다.
메시지 처리
- ConnectionStateEventDto가 CONNECT 또는 DISCONNECT 타입인 경우에만 처리했습니다.
- 브로드캐스팅할 데이터를 ConnectionStateEventResponse 객체로 변환했습니다.
- generateTopicPaths 메서드를 호출하여 메시지를 보낼 WebSocket 토픽 경로 리스트를 생성했습니다.
- 각 토픽 경로로 메시지를 병렬 스트림을 사용해 전송했습니다.
generateTopicPaths()
- ConnectionStateEventDto의 서버 ID와 DM방 ID를 기반으로 WebSocket 토픽 경로를 생성했습니다.
상태관리 서비스
카프카 컨슈머
@Slf4j
@Component
@RequiredArgsConstructor
public class ConnectionStateInfoConsumer {
private final UserStateCommandService userStateCommandService;
@KafkaListener(topics = "${spring.kafka.topic.connection-state-info}", groupId = "${spring.kafka.consumer.group-id.connection-state-info}", containerFactory = "connectionStateInfoListenerContainerFactory")
public void connectionStateInfoListener(ConnectionStateInfo connectionStateInfo) {
userStateCommandService.saveUserConnectionState(connectionStateInfo);
}
}
- Kafka로부터 수신한 ConnectionStateInfo 메시지를 처리하는 메서드입니다.
온/오프 상태 저장 서비스 로직
@Slf4j
@Service
@RequiredArgsConstructor
public class UserStateCommandServiceImpl implements UserStateCommandService {
private final UserStateRepository userStateRepository;
@Override
public DataResponseDto<Long> saveUserConnectionState(ConnectionStateInfo connectionStateInfo) {
Long userId = null;
switch (connectionStateInfo.getType()) {
case CONNECT -> handleConnectionState(connectionStateInfo);
case DISCONNECT -> userId = handleDisconnectionState(connectionStateInfo);
}
return DataResponseDto.of(userId);
}
private void handleConnectionState(ConnectionStateInfo connectionStateInfo) {
UserState userState = userStateRepository.findById(String.valueOf(connectionStateInfo.getUserId()))
.orElse(UserState.builder()
.userId(connectionStateInfo.getUserId())
.build());
userState.modify(connectionStateInfo.getSessionId(), connectionStateInfo.getState());
userStateRepository.save(userState);
}
private Long handleDisconnectionState(ConnectionStateInfo connectionStateInfo) {
return userStateRepository.findByChatSessionId(connectionStateInfo.getSessionId())
.map(userState -> {
userState.modify(connectionStateInfo.getState());
userStateRepository.save(userState);
return userState.getUserId();
})
.orElse(null);
}
}
saveUserConnectionState()
- 사용자의 연결 상태 정보를 저장합니다.
- ConnectionStateInfo 객체를 입력으로 받아 연결 상태 유형(CONNECT 또는 DISCONNECT)에 따라 다른 처리를 합니다.
- 연결 상태를 처리한 후, 결과로 사용자 ID를 포함한 DataResponseDto 객체를 반환합니다.
handleConnectionState()
- 사용자가 연결되었을 때 호출됩니다.
- userStateRepository를 통해 사용자 상태를 조회합니다.
- 사용자가 없으면 새로운 UserState 객체를 생성합니다.
- 사용자 상태를 수정하고 저장합니다.
handleDisconnectionState()
- 사용자가 연결을 해제했을 때 호출됩니다.
- 웹소켓 세션 ID를 사용하여 사용자 상태를 조회합니다.
- 조회된 사용자의 상태를 수정하고 저장합니다.
- 사용자 ID를 반환합니다. 만약 해당 세션 ID를 가진 사용자가 없으면 null을 반환합니다
마무리하며
처음 온/오프 처리에 어떻게 할지 막막했지만 웹소켓 연결 기준으로 온/오프 상태를 지정했고 이를 바탕으로 로직 설계 후 기능 구현을 완료했습니다. 항상 느끼는 거지만 처음 다루는 기술이라면 공식문서를 보면서 답을 찾는 게 가장 좋은 방법이라고 생각합니다.
기능을 구현하면서 겪었던 이슈로는 아래와 같습니다. 이슈1과 이슈 2는 따로 글까지 작성할 수준은 아니라고 생각했고 이슈 3과 관련해서는 다음 글로 작성해 볼려합니다.
이슈1
첫 번째 이슈는 StompCommand가 DISCONNECT인 경우 헤더에 접근하여 유저 id를 가져올 수 없었습니다.
그래서 처음 로직 설계 시 웹소켓 DISCONNECT인 경우도 CONNECT처럼 카프카를 통해 상태관리 서비스로 DISCONNECT에 대한 이벤트를 전송하는 방식이었습니다. DISCONNECT인 경우 유저 id를 가져올 수 없어 카프카가 아닌 openfeign을 사용하여 웹소켓 세션 id를 상태관리 서비스로 전송해 해당 웹소켓 세션 id를 가지고 있는 유저 id를 조회하는 방식으로 로직을 수정했습니다.
이러다 보니 DISCONNECT인 경우 채팅 서비스가 상태 서비스와 커뮤니티 서비스 간 동기 통신 구조여서 이 부분을 비동기 통신으로 바꾸는 방식에 대해 찾아보려 합니다.
이슈2
두 번째는 동기 통신과 비동기 통신을 동시 사용 시 겪은 이슈입니다.
첫 번째 이슈처럼 로직을 수정 후 DISCONNECT 이벤트 전송은 카프카를 통해 전송하고 유저 id값을 조회하는 거는 openfeign을 사용하여 조회하는 방식이었습니다. 이럴 경우 카프카를 통해 전송하는 부분은 비동기 처리이고 openfeign을 사용하여 유저 id를 조회하는 부분은 동기 처리였습니다. 당연히 둘을 동시 사용할 경우 값을 제대로 못 가져오는 문제가 발생했습니다. 그래서 카프카를 통해 전송하는 비동기 처리를 제거하고 openfeign만을 사용하여 동기 처리를 했습니다. 동기와 비동기에 대한 얄팍한 개념으로 생긴 이슈였다고 생각합니다.
이슈3
마지막 이슈는 connectionStateEventListener() 메서드에서 ConnectionStateEventDto 메시지 브로드캐스팅 부분에 대한 이슈였습니다. connectionStateEventListener() 메서드에서 ConnectionStateEventDto 메시지를 웹소켓이 연결 됐거나 끊긴 유저가 속해 있는 서버와 DM방에 브로드캐스팅해야 합니다. 이때 유저가 여러 개의 서버나 DM방에 속해 있을 경우 동시에 전송하는 부분에 대해 문제가 발생했습니다. 이 부분에 대해서는 다음 글에서 다루도록 하겠습니다.
'프로젝트 > FitTrip' 카테고리의 다른 글
개발 기록 - FitTrip에 트랜잭셔널 아웃박스 패턴 적용하기 (0) | 2024.07.15 |
---|---|
개발 기록 - 온라인/오프라인 상태 메시지 전송 성능 최적화 (0) | 2024.07.14 |
개발 기록 - 유저의 마지막 채널 위치 기억 기능 (0) | 2024.07.13 |
개발 기록 - 분산 시스템에서 데이터를 전달하는 효율적인 방법 (0) | 2024.07.12 |
트러블 슈팅 - SockJs를 사용한 웹소켓 연결 시 CORS 이슈 (0) | 2024.07.08 |