코딩공작소
MSA(10) - 스트림을 사용한 이벤트 기반 아키텍처 본문
애플레케이션 내 통신을 할 때, 반복되는 데이터 읽기의 서비스인 경우 캐싱이 유효할 수 있다.
이는, 서비스에 대한 호출 응답 시간을 크게 향상시킬 수 있다.
캐싱 솔루션의 세 가지 핵심 요구 사항
- 캐싱된 데이터는 라이선싱 서비스의 모든 인스턴스에 일관성이 있어야 한다.
- 라이선싱 서비스를 호스팅하는 컨테이너 메모리에 조직 데이터를 캐싱하면 안 된다.
- 업데이트나 삭제로 조직 레코드가 변경될 때 라이선싱 서비스는 조직 서비스의 상태 변화를 인식해야 한다.
이러한 요구 사항을 만족하는 방법 중
조직 서비스가 자기 데이터가 변경되었음을 알리려고 비동기 이벤트를 발송하는 방법이 효과적이다.
이 방법은 조직 서비스의 레코드가 변경되었을 때 메시지를 큐에 발행하고, 중개자(브로커)와 라이선싱 서비스는 조직 이벤트가 발생했는지 확인하고, 발행하면 캐시에서 조직 데이터를 삭제한다.
메시징을 사용한 서비스 간 상태 변화 전달
라이선싱 서비스와 조직 서비스 사이에는 토픽이라는 메시징 방식이 추가되며, 조직서비스의 상태 변경에 메시지를 관리한다.
이때, 메시지 큐는 중개자 역할을 하며 4 가지 이점을 제공한다.
- 느슨한 결합
조직 서비스는 상태 변화를 발행해야 할 때 메시지 큐에 기록한다. 라이선싱 서비스는 메시지가 수신되었다는 것만 알고 누가 발행했는지는 알지 못한다. - 내구성
큐가 있으면 서비스 소비자가 다운된 경우에도 메시지 전달을 보장할 수 있다. - 확장성
속도가 나오지 않아도, 메시지 소비자를 더 많이 가동시켜 큐의 메시지를 처리하게 하는 것이 간편하다. - 유연성
새로운 소비자의 코드는 발행되는 이벤트를 수신하고 적절히 대응할 수 있다.
메시징 아키텍처의 단점
- 메시지 처리의 의미론
애플리케이션이 메시지가 소비되는 순서에 따라 어떻게 동작하는지와 메시지가 순서대로 처리되지 않을 때 어떻게 되는지를 이해해야 한다. - 메시지 가시성
상관관계 ID는 사용자 트랜잭션의 시작 시점에 생성되어 모든 서비스 호출에 전달되는 고유한 번호이며 발행 및 소비되는 모든 메시지에 포함되어 전달되어야 한다. - 메시지 코레오그래피
선형적인 방식이 아니기 때문에 비즈니스 로직을 추론하는 것이 어렵고, 여러 다른 서비스의 로그를 모두 살펴보아야 한다.
스프링 클라우드 스트림
애플리케이션의 메시지 발행자와 소비자를 쉽게 구축할 수 있는 애너테이션 기반 프레임워크다.
메시징을 사용하기 위해서는 두 서비스 관점에서 메시지 발행자와 메시지 소비자가 필요하다.
- 소스 : 메시지 발행
- 채널 : 메시지를 보관할 큐를 추상화
- 바인더 : 특정 메시지 플랫폼과 통신하는 스프링 코드
- 싱크 : 큐에서 메시지 수신
간단한 메시지 생산자와 소비자 작성
아파치 카프카 및 레디스 도커 구성
docker-compose.yml 파일에 코드를 추가해야 한다.
zookeeper:
image: zookeeper:3.7.0
container_name: zookeeper
ports:
- 2181:2181
networks:
backend:
aliases:
- "zookeeper"
kafkaserver:
image: wurstmeister/kafka:latest
container_name: kafka
ports:
- 9092:9092
environment:
- KAFKA_ADVERTISED_HOST_NAME=kafka
- KAFKA_ADVERTISED_PORT=9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CREATE_TOPICS=dresses:1:1,ratings:1:1
volumes:
- "/var/run/docker.sock:/var/run/docker.sock"
depends_on:
- zookeeper
networks:
backend:
aliases:
- "kafka"
redisserver:
image: redis:alpine
container_name: redis
ports:
- 6379:6379
networks:
backend:
aliases:
- "redis"
메시지 생산자 도커 환경에 카프카 및 레디스 서비스를 추가한다.
<<pom.xml>>
<dependency>
<groupId>org.springframework.cloud</>
<artifactId>spring-cloud-stream</>
</>
<dependency>
<groupId>org.springframework.cloud</>
<artifactId>spring-cloud-starter-stream-kafka</>
</>
mvc clean package dockerfile:build && docker-compose -f docker/docker-compose.yml up
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
package com.optimagrowth.organization;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
@SpringBootApplication
@RefreshScope
@EnableBinding(Source.class) --> 스프링 클라우드 스트림이 메시지 브로커에 애플리케이션을 바인딩하도록 지정
public class OrganizationServiceApplication {
public static void main(String[] args) {
SpringApplication.run(OrganizationServiceApplication.class, args);
}
}
|
cs |
EnableBinding애너테이션은 클래스에서 정의된 채널들을 이용하여 메시지 브로커와 통신할 것이라고 스트링 클라우드 스트림에 알린다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
package com.optimagrowth.organization.utils;
import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Component;
@Component
public class UserContext {
public static final String CORRELATION_ID = "tmx-correlation-id";
public static final String AUTH_TOKEN = "Authorization";
public static final String USER_ID = "tmx-user-id";
public static final String ORG_ID = "tmx-org-id";
private static final ThreadLocal<String> correlationId= new ThreadLocal<String>();
private static final ThreadLocal<String> authToken= new ThreadLocal<String>();
private static final ThreadLocal<String> userId = new ThreadLocal<String>();
private static final ThreadLocal<String> orgId = new ThreadLocal<String>();
public static String getCorrelationId() { return correlationId.get(); }
public static void setCorrelationId(String cid) {correlationId.set(cid);}
public static String getAuthToken() { return authToken.get(); }
public static void setAuthToken(String aToken) {authToken.set(aToken);}
public static String getUserId() { return userId.get(); }
public static void setUserId(String aUser) {userId.set(aUser);}
public static String getOrgId() { return orgId.get(); }
public static void setOrgId(String aOrg) {orgId.set(aOrg);}
public static HttpHeaders getHttpHeaders(){
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.set(CORRELATION_ID, getCorrelationId());
return httpHeaders;
}
}
|
cs |
변수를 스레드 로컬로 만든다. 이렇게 하면 현재 스레드에 대한 데이터를 스레드별로 저장할 수 있다.
여기에 설정된 정보는 그 값을 설정한 스레드만 읽을 수 있다.
메시지 브로커에 메시지 발행
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
package com.optimagrowth.organization.events.source;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import com.optimagrowth.organization.events.model.OrganizationChangeModel;
import com.optimagrowth.organization.utils.ActionEnum;
import com.optimagrowth.organization.utils.UserContext;
@Component
public class SimpleSourceBean {
private Source source;
private static final Logger logger = LoggerFactory.getLogger(SimpleSourceBean.class);
public SimpleSourceBean(Source source){ --> 서비스에서 사용되는 Source 인터페이스 구현체를 주입
this.source = source;
}
public void publishOrganizationChange(ActionEnum action, String organizationId){
logger.debug("Sending Kafka message {} for Organization Id: {}", action, organizationId);
OrganizationChangeModel change = new OrganizationChangeModel(
OrganizationChangeModel.class.getTypeName(),
action.toString(),
organizationId,
UserContext.getCorrelationId()); --> 자바 POJO 메시지를 발행한다.
source.output().send(MessageBuilder.withPayload(change).build());--> Source 클래스에서
정의된 채널에서 전달된 메시지를 발송한다. }
}
|
cs |
스프링 클라우드 Source 클래스에 코드를 주입했다. 특정 메시지 토픽에 대한 모든 통신은 스프링 클라우드 스트림의 채널이라는 자바 인터페이스 클래스로 한다.
채널모델 객체 발행
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
package com.optimagrowth.organization.events.model;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
@Getter @Setter @ToString
public class OrganizationChangeModel {
private String type;
private String action;
private String organizationId;
private String correlationId;
public OrganizationChangeModel(String type, String action, String organizationId, String correlationId) {
super();
this.type = type;
this.action = action;
this.organizationId = organizationId;
this.correlationId = correlationId;
}
}
|
cs |
- action : 이벤트를 발생시킨 액션
- organizationId : 이벤트와 연관된 조직 ID
- correlationId : 이벤트를 발생시킨 서비스 호출의 상관관계 ID ( 디버깅에 필수 )
조직 서비스에서 메시지 발행하기
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
package com.optimagrowth.organization.service;
import java.util.Optional;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.optimagrowth.organization.events.source.SimpleSourceBean;
import com.optimagrowth.organization.model.Organization;
import com.optimagrowth.organization.repository.OrganizationRepository;
import com.optimagrowth.organization.utils.ActionEnum;
import brave.ScopedSpan;
import brave.Tracer;
@Service
public class OrganizationService {
private static final Logger logger = LoggerFactory.getLogger(OrganizationService.class);
@Autowired
private OrganizationRepository repository;
@Autowired
SimpleSourceBean simpleSourceBean;--> 조직 서비스에 SimpleSourceBean을 주입하려고 자동연결
@Autowired
Tracer tracer;
public Organization findById(String organizationId) {
Optional<Organization> opt = null;
ScopedSpan newSpan = tracer.startScopedSpan("getOrgDBCall");
try {
opt = repository.findById(organizationId);
simpleSourceBean.publishOrganizationChange(ActionEnum.GET, organizationId);
if (!opt.isPresent()) {
String message = String.format("Unable to find an organization with the Organization id %s", organizationId);
logger.error(message);
throw new IllegalArgumentException(message);
}
logger.debug("Retrieving Organization Info: " + opt.get().toString());
}finally {
newSpan.tag("peer.service", "postgres");
newSpan.annotate("Client received");
newSpan.finish();
}
return opt.get();
}
public Organization create(Organization organization){
organization.setId( UUID.randomUUID().toString());
organization = repository.save(organization);
simpleSourceBean.publishOrganizationChange(ActionEnum.CREATED, organization.getId());
--> 조직 데이터를 변경하는 모든 메서드는 simpleSourceBean, publishOrganizationChange()를 호출 return organization;
}
public void update(Organization organization){
repository.save(organization);
simpleSourceBean.publishOrganizationChange(ActionEnum.UPDATED, organization.getId());
}
public void delete(String organizationId){
repository.deleteById(organizationId);
simpleSourceBean.publishOrganizationChange(ActionEnum.DELETED, organizationId);
}
@SuppressWarnings("unused")
private void sleep(){
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
logger.error(e.getMessage());
}
}
}
|
cs |
라이선싱 서비스에서 메시지 소비자 작성
스프링 클라우드 스트림을 사용하는 서비스가 어떻게 서비스를 소비하는지 확인해보자
일단, pom.xml 파일에 의존성을 추가해야 한다.
<dependency>
<groupId>org.springframework.cloud</>
<artifactId>spring-cloud-stream</>
</>
<dependency>
<groupId>org.springframework.cloud</>
<artifactId>spring-cloud-starter-stream-kafka</>
</dependency>
package com...~
@EnableBinding(Sink.class) --> 유입되는 메시지를 수신하고자 Sink 인터페이스에 정의된 채널을 사용하도록 서비스를 설정
public class LicenseServiceApplicatiopn {
@StreamListener(Sink.INPUT) --> 입력 채널에서 메시지를 받을 때마다 이 메서드를 실행한다.
public void loggerSink(...){...}
}
라이선싱 서비스와 조직 서비스 간 차이는 @EnableBinding에 전달하는 값이 다르다는 것이다.
메시지 서비스 동작 보기
조직 서비스는 레코드가 추가, 수정, 삭제될 때마다 orgChangeTopic에 메시지를 발행하고 라이선싱 서비스는 그 토픽에서 메시지를 수신한다.
즉, 스프링 클라우드 스트림은 이 서비스 사이에서 중개자 역할을 한다.
- 메시징을 사용한 비동기 통신은 마이크로서비스 아키텍처에서 중요한 부분이다
- 애플리케이션에서 메시징을 사용하면 서비스를 확장하고 결함 내성을 높일 수 있다
- 스프링 클라우드 스트림은 간단한 애너테이션을 사용하고 하부 메시징 플랫폼별 세부 정보를 추상화하여 메시지 생성 및 소비를 단순화한다.
- 메시지 소스(source)는 메시지 브로커 큐에 메시지를 방행하는 애너테이션이 추가된 자바 메서드
- 메시지 싱크(sink)는 메시지 브로커 큐에서 메시지를 수신하는 애너테이션이 추가된 자바 메서드
- 레디스는 데이터베이스와 캐시로 모두 사용될 수 있는 키-값 저장소이다.
'어플리케이션개발 > MSA' 카테고리의 다른 글
MSA(12) - 마이크로서비스 배포 (0) | 2024.08.06 |
---|---|
MSA(11) - 스프링 클라우드 슬루와 집킨을 이용한 분산 추적 (0) | 2024.07.28 |
MSA(9) - 마이크로서비스 보안 (0) | 2024.07.22 |
MSA(8) - 게이트웨이를 이용한 서비스 라우팅 (0) | 2024.07.16 |
MSA(7) - Resilience4j를 사용한 회복성 패턴 (1) | 2024.07.14 |